我正在尝试将数据从 SQL SERVER 加载到 GCP Bigquery uisng Dataflow。运行管道时出现以下错误:
Error Loading DataFrame to BigQuery Table (pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int [while running 'Processing-ptransform-33'] )
SQL SERVER表的Schema :
unique_id INTEGER
scheme_id STRING
member_unique_id INTEGER
portfolio_code INTEGER
operation_date DATETIME
employer_unique_id INTEGER
group_code INTEGER
activity_group_code INTEGER
contribution_month STRING
summary STRING
activity_group_description STRING
rsa_transaction_date DATETIME
debit_credit_code INTEGER
accounting_date DATETIME
valuation_date DATETIME
rsa_value FLOAT
rsa_units FLOAT
unit_price FLOAT
数据流管道脚本:
import apache_beam as beam
import os
import argparse
import logging
import pandas as pd
import datetime
from google.cloud import bigquery
import pytz
from oauth2client.client import GoogleCredentials
from datetime import datetime,date,timedelta
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
class setenv(beam.DoFn):
def process(self,context,df_Bucket):
import jaydebeapi
import pandas as pd
src1='gs://'+df_Bucket+'/JAVA_JDK_AND_JAR'
os.system('gsutil cp '+src1+'/mssql-jdbc-10.2.1.jre8.jar /tmp/' +'&&'+ 'gsutil cp -r '+src1+'/jdk-8u202-linux-x64.tar.gz /tmp/')
logging.info('Jar copied to Instance..')
logging.info('Java Libraries copied to Instance..')
os.system('mkdir -p /usr/lib/jvm && tar zxvf /tmp/jdk-8u202-linux-x64.tar.gz -C /usr/lib/jvm && update-alternatives --install "/usr/bin/java" "java" "/usr/lib/jvm/jdk1.8.0_202/bin/java" 1 && update-alternatives --config java')
logging.info('Enviornment Variable set.')
return list("1")
class readandwrite(beam.DoFn):
def process(self, context, conn_Detail):
import jaydebeapi
import pandas as pd
from datetime import datetime,date,timedelta
from google.cloud import bigquery
DatabaseConn=conn_Detail.split("~|*")
database_user=DatabaseConn[0]
database_password=DatabaseConn[1]
database_host=DatabaseConn[2]
database_port=DatabaseConn[3]
database_db=DatabaseConn[4]
jclassname = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
url = ("jdbc:sqlserver://"+database_host+":"+database_port+";databaseName="+database_db+";encrypt=false")
logging.info(url)
jars = "/tmp/mssql-jdbc-10.2.1.jre8.jar"
libs = None
cnx = jaydebeapi.connect(jclassname,url,{'user':database_user,'password':database_password},jars=jars)
logging.info('Connection Successful..')
cursor = cnx.cursor()
logging.info('Query submitted to SQL Server Database..')
logging.info('printing data')
tables='CDC_STG_RSA_MEMBER_ACCOUNT'
query="select * from {}".format(tables)
sql_query = pd.read_sql(query, cnx)
df = pd.DataFrame(sql_query,index=None)
client = bigquery.Client()
table_id="myproject.POC.BTCH_STG_MSSQL_ACCOUNT"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("unique_id", bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("scheme_id", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("member_unique_id",bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("portfolio_code",bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("operation_date", bigquery.enums.SqlTypeNames.DATETIME),
bigquery.SchemaField("employer_unique_id", bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("group_code",bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("activity_group_code",bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("contribution_month", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("summary", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("activity_group_description",bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField("rsa_transaction_date",bigquery.enums.SqlTypeNames.DATETIME),
bigquery.SchemaField("debit_credit_code",bigquery.enums.SqlTypeNames.INTEGER),
bigquery.SchemaField("accounting_date",bigquery.enums.SqlTypeNames.DATETIME),
bigquery.SchemaField("valuation_date",bigquery.enums.SqlTypeNames.DATETIME),
bigquery.SchemaField("rsa_value",bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("rsa_units",bigquery.enums.SqlTypeNames.FLOAT64),
bigquery.SchemaField("unit_price",bigquery.enums.SqlTypeNames.FLOAT64)
],
write_disposition="WRITE_TRUNCATE",
)
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
) # Make an API request.
job.result()
def run():
try:
parser = argparse.ArgumentParser()
parser.add_argument(
'--dfBucket',
required=True,
help= ('Bucket where JARS/JDK is present')
)
parser.add_argument(
'--connDetail',
required=True,
help= ('Source Database Connection Detail')
)
known_args, pipeline_args = parser.parse_known_args()
global df_Bucket
df_Bucket = known_args.dfBucket
global conn_Detail
conn_Detail = known_args.connDetail
pipeline_options = PipelineOptions(pipeline_args)
pcoll = beam.Pipeline(options=pipeline_options)
logging.info("Pipeline Starts")
dummy= pcoll | 'Initializing..' >> beam.Create(['1'])
dummy_env = dummy | 'Setting up Instance..' >> beam.ParDo(setenv(),df_Bucket)
readrecords=(dummy_env | 'Processing' >> beam.ParDo(readandwrite(), conn_Detail))
#Write_To_GCS = (readrecords | 'WriteToGCS' >> beam.io.WriteToText("gs://gcp01-sb-krnospbi-dataflow-bucket/source_data/CMSPRINCIPALEMPLOYER.txt"))
p=pcoll.run()
logging.info('Job Run Successfully!')
p.wait_until_finish()
except:
logging.exception('Failed to launch datapipeline')
raise
if __name__ == '__main__':
#parser = argparse.ArgumentParser(description=__doc__ , formatter_class=argparse.RawDescriptionHelpFormatter)
run()
需要一些输入来解决问题