您好,我仍处于 Apache Beam 的学习阶段,下面是我编写的一个脚本。它从 pub-sub 读取并写入 BigQuery。但是我收到错误 RuntimeError: NotImplementedError [while running 'BatchElements(messages)']将 pcollection 转换为数据框以进行进一步的数据转换。我缺少什么?
谢谢
我的代码
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
import os
import typing
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "my_json_file.json"
# Do function for passing in pubsub message from the subscription
# Define a NamedTuple schema
class BmsSchema(typing.NamedTuple):
ident: str
class ParsePubSubMessage(beam.DoFn):
def process(self, message):
import json
# Creating the main_dict that has all the columns
all_columns = ['ident']
main_dict = dict(zip(all_columns, [None] * len(all_columns)))
# Parse the JSON message
record = json.loads(message.decode('utf-8'))
main_dict.update(record)
yield {
'ident': main_dict["ident"]
}
def run():
# Define pipeline options
options = PipelineOptions(
project='dwingestion',
runner='DirectRunner',
streaming=True, # Enable streaming mode
temp_location='gs://........./temp',
staging_location='gs://....../staging',
region='europe-west1',
job_name='flesp-streaming-pipeline-dataflow-test'
)
# Set streaming mode
options.view_as(StandardOptions).streaming = True
# Pub/Sub subscription
input_subscription = 'projects/...../subscriptions/flespi_data_streaming'
table_schema = {
"fields": [
{"name": "ident", "type": "STRING", "mode": "NULLABLE"}
]
}
# Create the pipeline
with beam.Pipeline(options=options) as p:
# Read from Pub/Sub and parse the messages
messages = (p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
| 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
| 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
)
# Convert the messages to df
df = to_dataframe(messages)
transformed_pcol = to_pcollection(df)
# Write to BigQuery with schema autodetect
transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
table='.........flesp_table_test_4',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
schema=table_schema,
custom_gcs_temp_location='gs://........../temp'
)
if __name__ == '__main__':
run()
我通过将运行程序从 DirectRunner 更改为 DataflowRunner 解决了上述问题,当流设置为 True 时,用于本地测试的 directrunner 似乎不完全支持所有 to_dataframe 功能(上面是来自 pubsub 主题的流数据)