ApacheBeam 数据流作业的类型错误:“无法确定地编码<TableReference>,提供类型提示”

问题描述 投票:0回答:2

我可以使用 Direct Runner 在本地毫无问题地运行我的管道,但是当我部署到 Dataflow 时,出现以下错误:

"Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 219, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1311, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1322, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 919, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 200, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1401, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 239, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 393, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
 datasetId: 'my-dataset-id'
 projectId: 'my-project-id'
 tableId: 'my-table'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Write good logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()

  /---/

  File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
 datasetId: 'string1'
 projectId: 'string2'
 tableId: 'string3'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey' [while running 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/Map(reify_timestamps)-ptransform-9454']

这是管道的代码:

logs = (p | beam.io.ReadFromPubSub(topic=topic)
            | "Decode Json" >> beam.ParDo(ListProcessor())
            | 'Add Timestamp' >> beam.Map(lambda log: beam.window.TimestampedValue(log, time.time()))
            | 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
            | 'Calculate Pipeline Duration' >> beam.ParDo(DurationProcessor())
        )


additional_bq_parameters = {
        'timePartitioning': {'type': 'DAY', 'field': 'created_at'}}



errors = (logs | 'Write logs to BigQuery' >> beam.io.WriteToBigQuery(
    '{0}:{1}.{2}'.format(project, collection, table_name),
    schema=schema,
    additional_bq_parameters=additional_bq_parameters,
    insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
    create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))

你能帮我吗?我已经检查了源代码但没有成功。谢谢!

google-cloud-platform apache-beam dataflow apache-beam-io
2个回答
0
投票

为防止出现此问题中提到的错误,修复方法是使用 Apache Beam 版本 2.30.0 及更高版本,如 https://issues.apache.org/jira/browse/BEAM-12079.

中所报告

0
投票

我在测试中遇到了这个错误,我使用

apache_beam.testing.test_stream.TestStream

修复了它
test_stream = TestStream()
test_stream.add_elements(input_data)

with TestPipeline() as testpipeline:
  results = (
    testpipeline
      | test_stream
      | "do things"
      >> beam.ParDo(DoThings())
  )

© www.soinside.com 2019 - 2024. All rights reserved.