将 DataFrame 加载到 BigQuery 表时出错(pyarrow.lib.ArrowTypeError:<class 'str'> 类型的对象无法转换为 int)

问题描述 投票:0回答:0

我正在尝试将数据从 SQL SERVER 加载到 GCP Bigquery uisng Dataflow。运行管道时出现以下错误:

Error Loading DataFrame to BigQuery Table (pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int [while running 'Processing-ptransform-33'] )

SQL SERVER表的Schema :

unique_id   INTEGER             
scheme_id   STRING              
member_unique_id    INTEGER             
portfolio_code  INTEGER             
operation_date  DATETIME                
employer_unique_id  INTEGER             
group_code  INTEGER             
activity_group_code INTEGER             
contribution_month  STRING              
summary STRING              
activity_group_description  STRING              
rsa_transaction_date    DATETIME                
debit_credit_code   INTEGER             
accounting_date DATETIME                
valuation_date  DATETIME                
rsa_value   FLOAT               
rsa_units   FLOAT               
unit_price  FLOAT               

数据流管道脚本:

import apache_beam as beam
import os
import argparse
import logging
import pandas as  pd
import datetime
from google.cloud import bigquery
import pytz
from oauth2client.client import GoogleCredentials
from datetime import datetime,date,timedelta
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
class setenv(beam.DoFn): 
      def process(self,context,df_Bucket):
          import jaydebeapi
          import pandas as pd
          src1='gs://'+df_Bucket+'/JAVA_JDK_AND_JAR'
          os.system('gsutil cp '+src1+'/mssql-jdbc-10.2.1.jre8.jar /tmp/' +'&&'+ 'gsutil cp -r '+src1+'/jdk-8u202-linux-x64.tar.gz /tmp/')
          logging.info('Jar copied to Instance..')
          logging.info('Java Libraries copied to Instance..')
          os.system('mkdir -p /usr/lib/jvm  && tar zxvf /tmp/jdk-8u202-linux-x64.tar.gz -C /usr/lib/jvm  && update-alternatives --install "/usr/bin/java" "java" "/usr/lib/jvm/jdk1.8.0_202/bin/java" 1 && update-alternatives --config java')
          logging.info('Enviornment Variable set.')
          return list("1")
class readandwrite(beam.DoFn): 
      def process(self, context, conn_Detail):
        import jaydebeapi
        import pandas as pd
        from datetime import datetime,date,timedelta
        from google.cloud import bigquery
        DatabaseConn=conn_Detail.split("~|*")
        database_user=DatabaseConn[0]
        database_password=DatabaseConn[1]
        database_host=DatabaseConn[2]
        database_port=DatabaseConn[3]
        database_db=DatabaseConn[4]          
        jclassname = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        url = ("jdbc:sqlserver://"+database_host+":"+database_port+";databaseName="+database_db+";encrypt=false")
        logging.info(url)
        jars = "/tmp/mssql-jdbc-10.2.1.jre8.jar"
        libs = None
        cnx = jaydebeapi.connect(jclassname,url,{'user':database_user,'password':database_password},jars=jars)  
        logging.info('Connection Successful..') 
        cursor = cnx.cursor()
        logging.info('Query submitted to SQL Server Database..')
        logging.info('printing data')
        tables='CDC_STG_RSA_MEMBER_ACCOUNT'
        query="select * from {}".format(tables)
        sql_query = pd.read_sql(query, cnx)
        df = pd.DataFrame(sql_query,index=None)
        client = bigquery.Client()
        table_id="myproject.POC.BTCH_STG_MSSQL_ACCOUNT"
        job_config = bigquery.LoadJobConfig(
            schema=[
        bigquery.SchemaField("unique_id", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("scheme_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("member_unique_id",bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("portfolio_code",bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("operation_date", bigquery.enums.SqlTypeNames.DATETIME),
        bigquery.SchemaField("employer_unique_id", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("group_code",bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("activity_group_code",bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("contribution_month", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("summary", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("activity_group_description",bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("rsa_transaction_date",bigquery.enums.SqlTypeNames.DATETIME),
        bigquery.SchemaField("debit_credit_code",bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("accounting_date",bigquery.enums.SqlTypeNames.DATETIME),
        bigquery.SchemaField("valuation_date",bigquery.enums.SqlTypeNames.DATETIME),
        bigquery.SchemaField("rsa_value",bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("rsa_units",bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("unit_price",bigquery.enums.SqlTypeNames.FLOAT64)
    ],
        write_disposition="WRITE_TRUNCATE",
        )
        job = client.load_table_from_dataframe(
        df, table_id, job_config=job_config
    
        )  # Make an API request.
        job.result()
          
def run():    

    try: 
        parser = argparse.ArgumentParser()
    
        parser.add_argument(
            '--dfBucket',
            required=True,
            help= ('Bucket where JARS/JDK is present')
            )
        parser.add_argument(
            '--connDetail',
            required=True,
            help= ('Source Database Connection Detail')
            )
   
        known_args, pipeline_args = parser.parse_known_args()
    
        global df_Bucket 
        df_Bucket = known_args.dfBucket
        global conn_Detail 
        conn_Detail = known_args.connDetail
        pipeline_options = PipelineOptions(pipeline_args)
        pcoll = beam.Pipeline(options=pipeline_options)
        logging.info("Pipeline Starts")
        dummy= pcoll | 'Initializing..' >> beam.Create(['1'])
        dummy_env = dummy | 'Setting up Instance..' >> beam.ParDo(setenv(),df_Bucket)
        readrecords=(dummy_env | 'Processing' >>  beam.ParDo(readandwrite(), conn_Detail))
        #Write_To_GCS = (readrecords | 'WriteToGCS' >>  beam.io.WriteToText("gs://gcp01-sb-krnospbi-dataflow-bucket/source_data/CMSPRINCIPALEMPLOYER.txt"))
        p=pcoll.run()
        logging.info('Job Run Successfully!')
        p.wait_until_finish()
    except:
        logging.exception('Failed to launch datapipeline')
        raise    
if __name__ == '__main__':
    #parser = argparse.ArgumentParser(description=__doc__ , formatter_class=argparse.RawDescriptionHelpFormatter)
     run()

需要一些输入来解决问题

python-3.x pandas google-cloud-platform google-bigquery google-cloud-dataflow
© www.soinside.com 2019 - 2024. All rights reserved.