想要使用 Apache Beam 通过过滤管道中的事件来动态命名和创建表?

问题描述 投票:0回答:1

我有一个用例,我在一个事件驱动的架构中从 pub/sub 监听并且想要动态地存储和插入数据到表中。如果在流中注意到一个新的 eventName,我想创建新表,例如,如果我有一个用户创建的流数据,其名称在属性(这是一个 python 字典)中,作为 CREATED,那么应该发生的是这应该在用于用例的特定数据集中自动创建一个表,并开始将数据写入该特定表的新表中。我已经尝试了以下方法,但对我没有用,请帮忙:

class FilterEvents(beam.DoFn):
    def process(self, element):
        events = []
        event_name = element['event_name']
        for i in event_name:
            res.extend(list(i.values()))
        res=list(set(res))   
        if event_name in events:
            yield element

filtered_events = events | beam.ParDo(FilterEvents())

# Step 4: Use the Partition transform to split the filtered events into multiple PCollections based on the event name.
def partition_fn(element):
    event_name = element['event_name']
    if event_name == 'event1':
        return 0
    elif event_name == 'event2':
        return 1
    elif event_name == 'event3':
        return 2

partitioned_events = filtered_events | beam.Partition(partition_fn, 3)

# Step 5: Write the events in each partition to a separate table.
def write_to_table(elements, table_name):
    elements | beam.GroupByKey() | beam.io.WriteToBigQuery(
        table=table_name,
        # schema=<table_schema>,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

partitioned_events[0] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event1_table"))
partitioned_events[1] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event2_table"))
partitioned_events[2] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event3_table"))

pipeline.run()
python google-cloud-platform google-bigquery apache-beam
1个回答
0
投票

使用 Apache beam 的标准

WriteToBigQuery
接收器可以相对容易地完成此操作(参见 docs)。

特别是,您可以在

table
参数中提供一个自定义函数,该函数具有一个参数
element
并返回一个表示表名的字符串。所以你可以跳过整个分区,只做这样的事情:

filtered_events
| beam.io.gcp.bigquery.WriteToBigQuery(
 table=lambda element: f"{element['event_name']}_table",  # <- replace here with custom logic
 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)

你可以用更复杂的自定义方法替换简单的 lambda,这适合你的情况。甚至可以提供您也可以通过此方法访问的辅助输入(参见此处)。

© www.soinside.com 2019 - 2024. All rights reserved.