我无法通过 Cloud Composer 触发的 Dataflow 脚本连接到存储在 GCS 存储桶中的 jdbc 驱动程序(jar 文件)。我有一个作曲家脚本和一个数据流脚本。
Composer 脚本: 触发数据流脚本运行。 (在 Composer 存储桶的 Dags 文件夹中)
Dataflow python 脚本: 连接到 CloudSQL 中的 MySQL 以将数据读取和写入 Bigquery。 MySQL Jar 文件:
mysql-connector-j-8.2.0.jar
(两者都存储在 Composer 存储桶的存储桶位置)。
代码使用
apache_beam.io.jdbc.ReadFromJdbc
读取MySQL表。 classpath
数据流脚本:
"""Cloud sql mysql to bigquery"""
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.jdbc import ReadFromJdbc
def run(argv=None):
pipeline_options = PipelineOptions()
gcp_options = pipeline_options.view_as(GoogleCloudOptions)
gcp_options.project = 'project-name'
with beam.Pipeline(options=pipeline_options) as p:
jdbc_url = (
"jdbc:mysql://someIPhere:3306/dbname"
)
classpath='gs://us-central1-viknesh-compose-3121212a-bucket/mysql-connector-j-8.2.0.jar'
query = 'SELECT * FROM Persons'
output_table = 'project-name:datasets_us_cent1.new_test_mysql_cloudsql'
sql_data = p | 'Read from Cloud SQL' >> ReadFromJdbc(
query=query,
jdbc_url=jdbc_url,
table_name='new_test_mysql_cloudsql',
username='root',
password='mypassword',
driver_class_name='com.mysql.cj.jdbc.Driver',
classpath=classpath
)
sql_data | 'Write to BigQuery' >> WriteToBigQuery(
table=output_table,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
if __name__ == '__main__':
run()
我的composer dag代码可以触发数据流代码。 我收到一个错误,指出数据流代码无法解析位置(gcs 路径)。有多个警告和错误。
警告:(对于路径中的每个字母)它显示警告
WARNING:root:Unable to parse g into group:artifact:version.
错误:
ValueError: unknown url type: 'g'
我尝试了多种方法来设置服务帐户权限和读取 Maven 页面,但没有得到解决。请帮忙!
https://beam.apache.org/releases/pydoc/current/apache_beam.io.jdbc.html
classpath – 要包含在扩展服务的类路径中的 JAR 或 Java 包的列表。 jdbc 通常需要此选项来包含额外的 JDBC 驱动程序包。包可以采用以下三种格式:(1) 本地文件,(2) URL,(3) Maven 包的 gradle 样式标识符(例如“org.postgresql:postgresql:42.3.1”)。默认情况下,此参数包含 Postgres SQL JDBC 驱动程序。
你可以尝试吗
classpath=[classpath]
?