我想问是否有人可以告诉我,甚至给我展示一个数据流作业模板的示例,最好是用 Python 编写,我可以:
我尝试深入研究所有 3 个产品的文档,但发现自己陷入了未记录的 API 的兔子洞中。
我尝试在 Python 中使用 Apache Beam,以便至少尝试使这样的管道工作,例如使用以下定义:
with beam.Pipeline(options=pipeline_options) as p:
_ = (
p
| "Read from Pub/Sub"
>> beam.io.ReadFromPubSub(
subscription=pipeline_options.input_subscription
)
| "Parse JSON" >> beam.Map(json.loads)
| "Process message" >> beam.ParDo(ProcessMessage())
| "Writing row object to BigTable"
>> WriteToBigTable(
project_id=pipeline_options.bigtable_project,
instance_id=pipeline_options.bigtable_instance,
table_id=pipeline_options.bigtable_table,
)
)
我什至不确定
json.loads
是否有效,如果有效,它会以什么格式到达我的“ProcessMessage”类,无论我输入什么键,我都尝试使其通用,但它仍然因错误而失败无法理解:
class ProcessMessage(beam.DoFn):
def process(self, message):
from google.cloud.bigtable import row as row_
import datetime
bt_row = row_.DirectRow(row_key=message.get('id'))
for k, v in message.items():
bt_row.set_cell("default", k.encode(), str(v).encode(), datetime.datetime.now())
yield bt_row
我非常不清楚如何转换我的 JSON 消息,它可能不是扁平的,从 Pub/Sub 流式传输:
{
"id": "12345",
"somekey": "somevalue",
"somekey2": ["some other value"]
}
转换成bigtable中的一行,它动态地将所有键转换为列。我知道bigtable需要一个唯一的行键,所以我有一个ID,但我不知道如何在代码中指定它。
您看过 GitHub 上的数据流食谱示例吗?
下面的代码显示了 apache beam 管道,它读取 pub/sub 订阅并在 bigtable 上写入,以您的输入为例:
import logging
import apache_beam as beam
from apache_beam.io import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.core import DoFn
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import Cell
from apache_beam.io.gcp.bigtableio import WriteToBigTable
class ConvertToJson(beam.DoFn):
def process(self, element):
import json
yield json.loads(element)
class MakeBigtableRow(DoFn):
def process(self, element):
row = DirectRow(row_key=str(element['id']))
for key, value in element.items():
row.set_cell(
column_family_id='cf1',
column=key,
value=str(value)
)
yield row
def run():
class ReadPubSubOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--subscription",
required=True,
help="PubSub subscription to read.",
)
parser.add_argument(
"--project_id",
required=True,
help="Project ID"
)
parser.add_argument(
"--instance_id",
required=True,
help="Cloud Bigtable instance ID"
)
parser.add_argument(
"--table_id",
required=True,
help="Cloud Bigtable table ID"
)
options = ReadPubSubOptions(streaming=True)
with beam.Pipeline(options=options) as p:
(
p
| "Read PubSub subscription"
>> ReadFromPubSub(subscription=options.subscription)
| "Convert to JSON" >> beam.ParDo(ConvertToJson())
| 'Map to Bigtable Row' >> beam.ParDo(MakeBigtableRow())
| "Write to BigTable" >> WriteToBigTable(
project_id=options.project_id,
instance_id=options.instance_id,
table_id=options.table_id
)
| beam.Map(logging.info)
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()