我正在尝试将文件从 GCS 存储桶复制到数据流中的 /tmp 位置。为此,我尝试了下面的代码 -
import apache_beam as beam
....
....
class copyFile(beam.DoFn):
def __init__(self, gcs_path):
self.gcs_path = gcs_path
def process(self, element):
with beam.io.gcp.GcsIO().open(self.gcs_path, 'rb') as src, \
open('/tmp/my_file.jar', 'wb') as dest:
dest.write(src.read())
# Log a success message
print('File copied to /tmp.')
yield element
当我使用 Direct Runner 运行时,此代码工作正常,但当我使用 Dataflow Runner 运行时,会抛出错误 -
NameError: name 'beam' is not defined
。我不明白原因。请问有人可以帮忙解决这个问题吗?
嗨 AInguva,XQ Hu, 感谢你的回复。我已将我尝试过的完整代码粘贴到此处。根据建议,我还添加了 save_main_session = True 选项。但是,它仍然无法与“DataflowRunner”一起使用。
import apache_beam as beam
import os
import subprocess
import jaydebeapi
import pandas as pd
from google.cloud import bigquery
import logging
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions
print("Starting the script")
class CopyJAR(beam.DoFn):
def __init__(self, file_path):
self.file_path = file_path
def process(self, element):
with beam.io.gcp.GcsIO().open(self.file_path, 'rb') as src, \
open('/tmp/mysql-connector-j-8.2.0.jar', 'wb') as dest:
dest.write(src.read())
print('File copied.')
return list("1")
class ReadFromDB(beam.DoFn):
def process(self, element):
jdbc_url = 'jdbc:mysql://public_ip:3306/my_db'
username = 'user'
password = 'password'
sql_query = 'select col1,col2 from my_table;'
connection = jaydebeapi.connect(
'com.mysql.cj.jdbc.Driver',
jdbc_url,
[username, password],
'/tmp/mysql-connector-j-8.2.0.jar',
)
print("Connection Successful")
schema = [
{'name': 'col1', 'type': 'INTEGER'},
{'name': 'col2', 'type': 'STRING'},
]
#Read from DB and write into BQ
for chunk in pd.read_sql(sql_query, connection, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):
chunk.apply(lambda x: x.replace('\r', ' ').replace('\n', ' ') if isinstance(x, str) else x).to_gbq("my_dataset.my_table", "my_project_id",if_exists='replace')
return list("1")
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.project = 'my_project_id'
google_cloud_options.region = 'gcp_region'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(SetupOptions).requirements_file='/home/airflow/gcs/dags/config/requirements.txt'
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
file_path = 'gs://my_bucket/mysql-connector-j-8.2.0.jar'
# Create a PCollection with a single element
data = p | 'Create' >> beam.Create(['1'])
# Apply the pipeline steps using ParDo
result = (
data
| 'Copy JAR' >> beam.ParDo(CopyJAR(file_path))
| 'Read From DB' >> beam.ParDo(ReadFromDB())
)
result = p.run()
result.wait_until_finish()
我认为这里的问题是XQ Hu建议的(
save_main_session=True
),但我想评论一下如何从Dataflow读取GCS中的文件。该代码片段使用 GcsIO
模块,这很好,但是也可以使用 Beam 文件系统模块以更通用的方式解决这个问题。使用 Beam 文件系统,您可以在 Beam 支持的任何存储系统(本地文件、GCS、S3、HDFS 等)中以便携式方式打开文件。
也就是说,如果您决定将文件移动到其他地方,则无需更改代码。另一个优点是测试。使用上面的代码片段,您需要访问 GCS 中的文件来测试您的
DoFn
。使用 Beam 文件系统,您可以使用本地文件进行单元测试。
您只需将
beam.gcp.GcsIO().open
更改为 beam.io.filesystems.FileSystems.open