无法从 Dataflow 中的 GCS 复制文件

问题描述 投票:0回答:1

我正在尝试将文件从 GCS 存储桶复制到数据流中的 /tmp 位置。为此,我尝试了下面的代码 -

import apache_beam as beam
....
....
    class copyFile(beam.DoFn):
        def __init__(self, gcs_path):
            self.gcs_path = gcs_path
    
        def process(self, element):
            with beam.io.gcp.GcsIO().open(self.gcs_path, 'rb') as src, \
                open('/tmp/my_file.jar', 'wb') as dest:
                    dest.write(src.read())
            # Log a success message
            print('File copied to /tmp.')
            yield element

当我使用 Direct Runner 运行时,此代码工作正常,但当我使用 Dataflow Runner 运行时,会抛出错误 -

NameError: name 'beam' is not defined
。我不明白原因。请问有人可以帮忙解决这个问题吗?

嗨 AInguva,XQ Hu, 感谢你的回复。我已将我尝试过的完整代码粘贴到此处。根据建议,我还添加了 save_main_session = True 选项。但是,它仍然无法与“DataflowRunner”一起使用。

import apache_beam as beam
import os
import subprocess
import jaydebeapi
import pandas as pd
from google.cloud import bigquery
import logging
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, WorkerOptions
        
print("Starting the script")


class CopyJAR(beam.DoFn):
    def __init__(self, file_path):
        self.file_path = file_path

    def process(self, element):
        with beam.io.gcp.GcsIO().open(self.file_path, 'rb') as src, \
            open('/tmp/mysql-connector-j-8.2.0.jar', 'wb') as dest:
                dest.write(src.read())
        print('File copied.')       
        return list("1")      

class ReadFromDB(beam.DoFn):    
    def process(self, element):        
        jdbc_url = 'jdbc:mysql://public_ip:3306/my_db'
        username = 'user'
        password = 'password'

        sql_query = 'select col1,col2 from my_table;'

        connection = jaydebeapi.connect(
            'com.mysql.cj.jdbc.Driver',
            jdbc_url,
            [username, password],
            '/tmp/mysql-connector-j-8.2.0.jar', 
        )        
        print("Connection Successful")

        schema = [
            {'name': 'col1', 'type': 'INTEGER'},
            {'name': 'col2', 'type': 'STRING'},
            ]
        
        #Read from DB and write into BQ
        for chunk in pd.read_sql(sql_query, connection, coerce_float=True, params=None, parse_dates=None, columns=None,chunksize=500000):
            chunk.apply(lambda x: x.replace('\r', ' ').replace('\n', ' ') if isinstance(x, str) else x).to_gbq("my_dataset.my_table", "my_project_id",if_exists='replace')

        return list("1")


options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.project = 'my_project_id'
google_cloud_options.region = 'gcp_region'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(SetupOptions).requirements_file='/home/airflow/gcs/dags/config/requirements.txt'
options.view_as(SetupOptions).save_main_session = True


with beam.Pipeline(options=options) as p:
    file_path = 'gs://my_bucket/mysql-connector-j-8.2.0.jar'   

    # Create a PCollection with a single element
    data = p | 'Create' >> beam.Create(['1'])

    # Apply the pipeline steps using ParDo
    result = (
        data
        | 'Copy JAR' >> beam.ParDo(CopyJAR(file_path))
        | 'Read From DB' >> beam.ParDo(ReadFromDB())
    )

    result = p.run()
    result.wait_until_finish()        
google-cloud-dataflow
1个回答
0
投票

我认为这里的问题是XQ Hu建议的(

save_main_session=True
),但我想评论一下如何从Dataflow读取GCS中的文件。该代码片段使用
GcsIO
模块,这很好,但是也可以使用 Beam 文件系统模块以更通用的方式解决这个问题。使用 Beam 文件系统,您可以在 Beam 支持的任何存储系统(本地文件、GCS、S3、HDFS 等)中以便携式方式打开文件。

也就是说,如果您决定将文件移动到其他地方,则无需更改代码。另一个优点是测试。使用上面的代码片段,您需要访问 GCS 中的文件来测试您的

DoFn
。使用 Beam 文件系统,您可以使用本地文件进行单元测试。

您只需将

beam.gcp.GcsIO().open
更改为
beam.io.filesystems.FileSystems.open

© www.soinside.com 2019 - 2024. All rights reserved.