Python xlsx编写器未创建文件,但日志记录表明正在创建该文件,并且没有错误消息

问题描述 投票:0回答:1

我有一个Python脚本,该脚本可连接到内部数据库,检索数据,然后将数据写入Excel工作簿。

体系结构是:

Python script
utils file
configuration file

这里是代码:

Python脚本:

import argparse
import sys
import traceback
import datetime
import utils
import xlsxwriter
from os import path
from company.log import log, LogLevel, register_source  ### "company" is an internal library

LOGSOURCE = utils.LOGSOURCE

def run(container, startDate, endDate):
    """
    Execute each query defined in DataRetrieval.cfg against the connected container and output the results.
    """
    queries = utils.get_queries(container)
    timeframeClause = utils.define_timeframe(startDate, endDate)
    workbookName = path.join(container.outputDirectory,'{}-Data.xlsx'.format(container.system, container.container))
    log(LOGSOURCE, LogLevel.INFO, 'run', 'Creating workbook: {}'.format(workbookName))
    workbook = xlsxwriter.Workbook(workbookName)
    queries_ws = utils.create_queries_ws(workbook)
    for ind, queryName in enumerate(queries):
        log(LOGSOURCE, LogLevel.INFO, 'run', 'Preparing query: {}'.format(queryName))
        query = utils.restrict_query_timeframe(queries[queryName], timeframeClause)
        header = utils.get_column_names(query)
        worksheet = workbook.add_worksheet(queryName)
        worksheet.write_row(0, 0, header)
        log(LOGSOURCE, LogLevel.INFO, 'run', 'Executing query: {}'.format(query))
        recordSet = container.cc.execute_query(query)
        log(LOGSOURCE, LogLevel.INFO, 'run', 'Writing query results to worksheet {}'.format(queryName))
        utils.write_to_excel(worksheet, recordSet)
        queries_ws.write(ind+1, 0, queryName)
        queries_ws.write(ind+1, 1, query)

def main():
    """
    Run the data retrieval generator.
    """
    startTime = datetime.datetime.now()
    parser = argparse.ArgumentParser(description='Simple script for exporting the result of queries into csv files. Queries are defined in the [Stats] section of the configuration file, and are resrticted to the user provided time frame')
    parser.add_argument('config', help='The configuration file defining the queries and login information for this process.')
    parser.add_argument('startDate', help='Date from which to begin query (inclusive). Format: mm/dd/yy')
    parser.add_argument('-e', '--endDate', help='Date to run query through (inclusive), defaults to today. Format: mm/dd/yy')
    args = parser.parse_args()

    cc = None
    try:
        cc = utils.dBlink(args.config)
        run(cc, args.startDate, args.endDate)
        log(LOGSOURCE, LogLevel.INFO, 'main', 'Completed in: {}'.format(datetime.datetime.now() - startTime))
    except Exception as e:
        print (e)
        log(LOGSOURCE, LogLevel.FATAL, 'main', traceback.format_exc())
        sys.exit(1)
    finally:
        if cc:
            cc.disconnect()
        sys.exit(0)

if __name__ == "__main__":
    main()

实用程序文件:

import re
from datetime import datetime
from company import dB
from company.log import log, LogLevel, register_source

LOGSOURCE = register_source('dataretrieval')

class dBlink(object):
    """A dB interface object."""

    def __init__(self, config):
        """Initialize and connect to dB."""
        self.section = 'DATA'
        self.cc = dB.ContainerClient(config, autologin=True)
        self.settings = dB.Settings(config)
        self.system = self.settings.get('RUNTIME', 'SYSTEM', required=True)
        self.container = self.settings.get('RUNTIME', 'CONTAINER', required=True)
        self.outputDirectory = self.settings.get(self.section, 'OutputDir', required=True)
        log(LOGSOURCE, LogLevel.INFO, '__init__', 'Successfully connected to dB: {}.{}'.format(self.system, self.container))

    def disconnect(self):
        """Disconnect from the dB."""
        if self.cc and self.cc.is_in_session():
            self.cc.session_shutdown()

def get_queries(container):
    """Returns all enumerated queries as a dictionary with the query names as keys"""
    queries = dict()
    for i in range(0,1000):
        query = container.settings.get(container.section, 'Query{}'.format(i))
        if query is not None and '' != query:
            name, query = query.split(':')
            queries[name] = query
            log(LOGSOURCE, LogLevel.INFO, 'get_queries', 'Found query: {}'.format(query))
    return queries

def define_timeframe(startDate, endDate):
    """Returns the datetime restriction to be appended to each query. Currently uses change_timestamp w/ alias 'i'"""
    start = datetime.strptime(startDate, '%m/%d/%y').date()
    if endDate:
        end = datetime.strptime(endDate, '%m/%d/%y').date()
    else:
        end = datetime.now().date()
    clause = "AND i.change_timestamp > $DATETIME('{}') and i.change_timestamp < $DATETIME('{}')".format(start,end)
    return clause

def restrict_query_timeframe(query, clause):
    """Add clause to filter by change_timestamp"""
    # Need to seperate out any order bys, case insensitive, so need to normalize
    order_by = re.compile(re.escape('order by'), re.IGNORECASE)
    query = order_by.sub('ORDER BY', query)
    if 'ORDER BY' in query:
        statement, order_criteria = query.split('ORDER BY')
        order_criteria = " ORDER BY {} ".format(order_criteria) 
    else:
        order_criteria = ''
        statement = query
    return "{} {} {}".format(statement, clause, order_criteria)

def get_column_names(query):
    """Return property names for the query as a list"""
    fromClause = re.compile(re.escape('from'), re.IGNORECASE)
    query = fromClause.sub('FROM', query)
    columns = query.split('FROM')[0]
    columnsList = columns.split(',')
    columnsList[0] = columnsList[0].split(' ')[-1]
    return columnsList

def write_to_excel(ws, recordSet):
    """Write each record in the recordset to a row in an excel worksheet."""
    row = 1
    for record in recordSet:
        ws.write_row(row, 0, record)
        row += 1

def create_queries_ws(wb):
    """Create 'Queries' worksheet that maps each worksheet to the query that generated its contents."""
    queries_ws = wb.add_worksheet('Queries')
    header_format = wb.add_format({'bold': True, 'underline': True})
    queries_ws.write(0, 0, "WorkSheet Name", header_format)
    queries_ws.write(0, 1, "Query", header_format)
    return queries_ws

配置文件:

[DATA]
OutputDir=C:/Users/me/Desktop

Query0=Orders:SELECT TOP 5 i.InternalOrderId FROM Orders i where i.OrderType='W' 

[RUNTIME]
SYSTEM=Production
CONTAINER=Sales

[STATS]
Enabled=true

[LOGGING]
LOGFILE=C:/Users/me/Desktop/logs/Stats-%RUNTIME:SYSTEM%_%RUNTIME:CONTAINER%.log
LISTENERS=INFO,TSV

TSV.SOURCES=dataretrieval
TSV.TARGETS=std::cout,%LOGFILE%
TSV.LEVEL=dbug

日志记录表明正在创建文件。但是,对计算机的搜索显示该文件不存在。并且,没有错误消息。

[奇怪的是,这段代码可以在我的同事的机器上运行(他正在运行Python 2.7),但不能在我的机器上工作。我认为这不是版本问题。

Windows版本10

Python版本:3.7.7

xlsxwriter版本:1.2.8

有人在这里看到这个问题吗?预先感谢!

python xlsxwriter
1个回答
0
投票

编辑:解决方案是卸载xlsxwriter的1.2.8版本并安装0.9.6。

© www.soinside.com 2019 - 2024. All rights reserved.