从单个JSON创建并插入多行到PubQuery / Sub和Dataflow的BigQuery

问题描述 投票:1回答:1

我创建了一个Beam Dataflow管道,用于解析PubSub主题中的单个JSON:

{
    "data": "test data",
    "options": {
        "test options": "test",
        "test_units": {
            "test": {
                "test1": "test1",
                "test2": "test2"
            },
            "test2": {
                "test1": "test1",
                "test2": "test2"
            },
            "test3": {
                "test1": "test1",
                "test2": "test2"
            }
        }
    }
}

我的输出是这样的:

{
  "data": "test data",
  "test_test_unit": "test1",
  "test_test_unit": "test2",
  "test1_test_unit": "test1",
    ...
},
{
  "data": "test data",
  "test_test_unit": "test1",
  "test_test_unit": "test2",
  "test1_test_unit": "test1",
    ...

}

基本上我正在做的是基于来自PubSub的JSON中有多少test_units并在单个dict中返回那么多行来展平数据。

我创建了一个Class来压平数据,返回行的dict

这是我的Beam管道:

lines = ( p | 'Read from PubSub' >> beam.io.ReadStringsFromPubSub(known_args.input_topic)
            | 'Parse data' >> beam.DoFn(parse_pubsub())
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                known_args.output_table,
                schema=table_schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
        )

这里有一些class来处理扁平化:

class parse_pubsub(beam.DoFn):
    def process(self, element):
    # ...
    # flattens the data
    # ...
    return rows

以下是Stackdriver日志中的错误:

Error processing instruction -138. Original traceback is Traceback (most recent call last): 
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 151, in _execute
response = task() File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", 
line 186, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/
dist-packages/apache_beam/runners/worker/sdk_worker.py", line 265, in do_instruction request.instruction_id) 
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 281, in
process_bundle delayed_applications = bundle_processor.process_bundle(instruction_id) File "/usr/local/lib/
python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 552, in process_bundle op.finish()
File "apache_beam/runners/worker/operations.py", line 549, in 
apache_beam.runners.worker.operations.DoOperation.finish def finish(self): File "apache_beam/runners/worker/
operations.py", line 550, in apache_beam.runners.worker.operations.DoOperation.finish with 
self.scoped_finish_state: File "apache_beam/runners/worker/operations.py", line 551, in 
apache_beam.runners.worker.operations.DoOperation.finish self.dofn_runner.finish() File "apache_beam/runners/
common.py", line 758, in apache_beam.runners.common.DoFnRunner.finish self._invoke_bundle_method
(self.do_fn_invoker.invoke_finish_bundle) File "apache_beam/runners/common.py", line 752, in 
apache_beam.runners.common.DoFnRunner._invoke_bundle_method self._reraise_augmented(exn) File "apache_beam/
runners/common.py", line 777, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise_with_traceback
(new_exn) File "apache_beam/runners/common.py", line 750, in
apache_beam.runners.common.DoFnRunner._invoke_bundle_method bundle_method() File "apache_beam/runners/common.py",
line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle def invoke_finish_bundle(self): File 
"apache_beam/runners/common.py", line 365, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle 
self.signature.finish_bundle_method.method_value()) File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/
gcp/bigquery.py", line 630, in finish_bundle self._flush_batch() File "/usr/local/lib/python2.7/dist-packages/
apache_beam/io/gcp/bigquery.py", line 637, in _flush_batch table_id=self.table_id, rows=self._rows_buffer) File

# HERE: 
"/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery_tools.py", 
line 611, in insert_rows for k, v in iteritems(row): File "/usr/local/lib/python2.7/dist-packages/future/utils/
__init__.py", line 308, in iteritems func = obj.items AttributeError: 'int' object has no attribute 'items'
[while running 'generatedPtransform-135']

我也试过返回一个列表,并且有与'list' object has no 'items'相同的错误,因此我将列表行转换为这样的dict:

0 {
  "data": "test data",
  "test_test_unit": "test1",
  "test_test_unit": "test2",
  "test1_test_unit": "test1",
    ...
},
1 {
  "data": "test data",
  "test_test_unit": "test1",
  "test_test_unit": "test2",
  "test1_test_unit": "test1",
    ...

}

我对此很新,所以任何帮助都将不胜感激!

python python-2.7 google-bigquery google-cloud-dataflow google-cloud-pubsub
1个回答
1
投票

你需要使用yield关键字来emit multiple outputs in your DoFn。例如:

class parse_pubsub(beam.DoFn):
  def process(self, element):
    # ...
    # flattens the data
    # ...
    for row in rows:
       yield row
© www.soinside.com 2019 - 2024. All rights reserved.