我可以使用 Direct Runner 在本地毫无问题地运行我的管道,但是当我部署到 Dataflow 时,出现以下错误:
"Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 219, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1311, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1322, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 919, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 200, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1401, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 239, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 393, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'my-dataset-id'
projectId: 'my-project-id'
tableId: 'my-table'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Write good logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
/---/
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'string1'
projectId: 'string2'
tableId: 'string3'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey' [while running 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/Map(reify_timestamps)-ptransform-9454']
这是管道的代码:
logs = (p | beam.io.ReadFromPubSub(topic=topic)
| "Decode Json" >> beam.ParDo(ListProcessor())
| 'Add Timestamp' >> beam.Map(lambda log: beam.window.TimestampedValue(log, time.time()))
| 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
| 'Calculate Pipeline Duration' >> beam.ParDo(DurationProcessor())
)
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY', 'field': 'created_at'}}
errors = (logs | 'Write logs to BigQuery' >> beam.io.WriteToBigQuery(
'{0}:{1}.{2}'.format(project, collection, table_name),
schema=schema,
additional_bq_parameters=additional_bq_parameters,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
你能帮我吗?我已经检查了源代码但没有成功。谢谢!
为防止出现此问题中提到的错误,修复方法是使用 Apache Beam 版本 2.30.0 及更高版本,如 https://issues.apache.org/jira/browse/BEAM-12079.
中所报告我在测试中遇到了这个错误,我使用
apache_beam.testing.test_stream.TestStream
修复了它
test_stream = TestStream()
test_stream.add_elements(input_data)
with TestPipeline() as testpipeline:
results = (
testpipeline
| test_stream
| "do things"
>> beam.ParDo(DoThings())
)