如何使用 DataFlow 将数据从 Pub/Sub 流式传输到 Google BigTable?

问题描述 投票:0回答:1

我想问是否有人可以告诉我,甚至给我展示一个数据流作业模板的示例,最好是用 Python 编写,我可以:

  1. 从 Pub/Sub 主题持续读取 JSON 数据
  2. 处理这些数据/使用自定义逻辑丰富它
  3. 将数据加载到现有的 BigTable 表中

我尝试深入研究所有 3 个产品的文档,但发现自己陷入了未记录的 API 的兔子洞中。

我尝试在 Python 中使用 Apache Beam,以便至少尝试使这样的管道工作,例如使用以下定义:

with beam.Pipeline(options=pipeline_options) as p:
    _ = (
        p
        | "Read from Pub/Sub"
        >> beam.io.ReadFromPubSub(
            subscription=pipeline_options.input_subscription
        )
        | "Parse JSON" >> beam.Map(json.loads)
        | "Process message" >> beam.ParDo(ProcessMessage())
        | "Writing row object to BigTable"
        >> WriteToBigTable(
            project_id=pipeline_options.bigtable_project,
            instance_id=pipeline_options.bigtable_instance,
            table_id=pipeline_options.bigtable_table,
        )
    )

我什至不确定

json.loads
是否有效,如果有效,它会以什么格式到达我的“ProcessMessage”类,无论我输入什么键,我都尝试使其通用,但它仍然因错误而失败无法理解:

class ProcessMessage(beam.DoFn):

    def process(self, message):
        from google.cloud.bigtable import row as row_
        import datetime         
        bt_row = row_.DirectRow(row_key=message.get('id'))
        for k, v in message.items():
            bt_row.set_cell("default", k.encode(), str(v).encode(), datetime.datetime.now())
        yield bt_row

我非常不清楚如何转换我的 JSON 消息,它可能不是扁平的,从 Pub/Sub 流式传输:

{
   "id": "12345",
   "somekey": "somevalue",
   "somekey2": ["some other value"]
}

转换成bigtable中的一行,它动态地将所有键转换为列。我知道bigtable需要一个唯一的行键,所以我有一个ID,但我不知道如何在代码中指定它。

google-cloud-dataflow apache-beam google-cloud-pubsub google-cloud-bigtable bigtable
1个回答
0
投票

您看过 GitHub 上的数据流食谱示例吗?

下面的代码显示了 apache beam 管道,它读取 pub/sub 订阅并在 bigtable 上写入,以您的输入为例:

    import logging
    
    import apache_beam as beam
    from apache_beam.io import ReadFromPubSub
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.transforms.core import DoFn
    from google.cloud.bigtable.row import DirectRow
    from google.cloud.bigtable.row_data import Cell
    from apache_beam.io.gcp.bigtableio import WriteToBigTable
    
    class ConvertToJson(beam.DoFn):
        def process(self, element):
            import json
            yield json.loads(element)
    
    class MakeBigtableRow(DoFn):
        def process(self, element):
            row = DirectRow(row_key=str(element['id']))
            for key, value in element.items():
                row.set_cell(
                    column_family_id='cf1',
                    column=key,
                    value=str(value)
                )
            yield row
    
    def run():
        class ReadPubSubOptions(PipelineOptions):
            @classmethod
            def _add_argparse_args(cls, parser):
                parser.add_argument(
                    "--subscription",
                    required=True,
                    help="PubSub subscription to read.",
                )
                parser.add_argument(
                    "--project_id",
                    required=True,
                    help="Project ID"
                )
                parser.add_argument(
                    "--instance_id",
                    required=True,
                    help="Cloud Bigtable instance ID"
                )
                parser.add_argument(
                    "--table_id",
                    required=True,
                    help="Cloud Bigtable table ID"
                )
        options = ReadPubSubOptions(streaming=True)
    
        with beam.Pipeline(options=options) as p:
            (
                p
                | "Read PubSub subscription"
                >> ReadFromPubSub(subscription=options.subscription)
                | "Convert to JSON" >> beam.ParDo(ConvertToJson())
                | 'Map to Bigtable Row' >> beam.ParDo(MakeBigtableRow())
                | "Write to BigTable" >> WriteToBigTable(
                    project_id=options.project_id,
                    instance_id=options.instance_id,
                    table_id=options.table_id
                )
                | beam.Map(logging.info)
            )
    
    if __name__ == "__main__":
        logging.getLogger().setLevel(logging.INFO)
        run()

© www.soinside.com 2019 - 2024. All rights reserved.