酸洗时数据流模板启动失败

问题描述 投票:0回答:1

我的数据流管道如下

    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True, sdk_location="container"
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | f"Read event topic"
            >> io.ReadFromPubSub(topic=input_topic).with_output_types(bytes)
            | "Convert to string" >> beam.Map(lambda msg: msg.decode("utf=8"))
            | f"Transform event"
            >> beam.Map(transform_message, event_name=event_name)
            | f"Write to output topic"
            >> beam.Map(publish_to_output_topic)
        )

我正在使用 Flex 模板 来部署我的管道。我使用 gcloud CLI 构建了它,如下所示

gcloud dataflow flex-template build gs://mybucket/templates/dataflow-latest.json \
    --image "us-docker.pkg.dev/project_id/dataflow/dataflow:latest" \
    --sdk-language "PYTHON"  "

我这样调用该作业

 gcloud dataflow flex-template run "test-job" \
    --template-file-gcs-location "gs://mybucket/templates/dataflow-latest.json" \
    --service-account-email "dataflow@project_id.iam.gserviceaccount.com" \
    --staging-location "gs://mybucket/staging/" \
    --temp-location "gs://mybucket/temp/" \
    --parameters event_name="foobuzz" \
    --parameters sdk_container_image="us-docker.pkg.dev/project_id/dataflow/dataflowsdk:latest" \
    --region "us-central2"

但是我的模板无法启动,并且不断收到此错误

{"severity":"INFO","time":"2023/07/11 17:59:12.176036","line":"python_template.go:64","message":"Using launch args: [/dataflow/template/beam.py --runner=DataflowRunner --region=us-central2 --staging_location=gs://mybucket/staging/ --event_name=foobuzz --project=project_id --job_name=test-job --template_location=gs://my-bucket/staging/template_launches/2023-07-11_10_55_55-17803180896608448008/job_object --service_account_email=dataflow@my_project.iam.gserviceaccount.com --temp_location=gs://mybucket/temp/ --sdk_container_image=us-docker.pkg.dev/project_id/dataflow:latest]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.782216","line":"exec.go:66","message":"┬ T4: \u003cclass 'apache_beam.transforms.core.CallableWrapperDoFn'\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.782528","line":"exec.go:66","message":"└ # T4 [54 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.782851","line":"exec.go:66","message":"┬ D2: \u003cdict object at 0x7fcaee860840\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.783157","line":"exec.go:66","message":"├┬ F1: \u003cfunction Map.\u003clocals\u003e.\u003clambda\u003e at 0x7fcaee86e430\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.783359","line":"exec.go:66","message":"│├┬ F2: \u003cfunction _create_function at 0x7fcb0746e1f0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.783562","line":"exec.go:66","message":"││└ # F2 [34 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784154","line":"exec.go:66","message":"│├┬ Co: \u003ccode object \u003clambda\u003e at 0x7fcaf7968df0, file \"/usr/local/lib/python3.8/site-packages/apache_beam/transforms/core.py\", line 1900\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784366","line":"exec.go:66","message":"││├┬ F2: \u003cfunction _create_code at 0x7fcb0746e280\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784554","line":"exec.go:66","message":"│││└ # F2 [19 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.784825","line":"exec.go:66","message":"││└ # Co [156 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785078","line":"exec.go:66","message":"│├┬ D4: \u003cdict object at 0x7fcaf7949300\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785230","line":"exec.go:66","message":"││└ # D4 [38 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785515","line":"exec.go:66","message":"│├┬ Ce2: \u003ccell at 0x7fcaee87a520: function object at 0x7fcaee86e3a0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785693","line":"exec.go:66","message":"││├┬ F2: \u003cfunction _create_cell at 0x7fcb0746ea60\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.785883","line":"exec.go:66","message":"│││└ # F2 [19 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786047","line":"exec.go:66","message":"││└ # Ce2 [24 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786211","line":"exec.go:66","message":"│├┬ D2: \u003cdict object at 0x7fcaee860d00\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786379","line":"exec.go:66","message":"││├┬ F1: \u003cfunction run.\u003clocals\u003e.\u003clambda\u003e at 0x7fcaee86e3a0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786567","line":"exec.go:66","message":"│││├┬ Co: \u003ccode object \u003clambda\u003e at 0x7fcb07d62240, file \"/dataflow/template/beam.py\", line 220\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.786850","line":"exec.go:66","message":"││││└ # Co [99 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.789579","line":"exec.go:66","message":"│││├┬ D1: \u003cdict object at 0x7fcb07e86fc0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.789761","line":"exec.go:66","message":"││││└ # D1 [22 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.789954","line":"exec.go:66","message":"│││├┬ D2: \u003cdict object at 0x7fcaee85b0c0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790158","line":"exec.go:66","message":"││││├┬ T6: \u003cclass 'apache_beam.typehints.decorators.IOTypeHints'\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790344","line":"exec.go:66","message":"│││││├┬ F2: \u003cfunction _create_namedtuple at 0x7fcb074700d0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790524","line":"exec.go:66","message":"││││││└ # F2 [25 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790718","line":"exec.go:66","message":"│││││└ # T6 [118 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.790898","line":"exec.go:66","message":"││││└ # D2 [143 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791050","line":"exec.go:66","message":"│││├┬ D2: \u003cdict object at 0x7fcaee866600\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791219","line":"exec.go:66","message":"││││├┬ D2: \u003cdict object at 0x7fcaef60f080\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791370","line":"exec.go:66","message":"│││││└ # D2 [2 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791529","line":"exec.go:66","message":"││││└ # D2 [63 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791681","line":"exec.go:66","message":"│││└ # F1 [341 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791824","line":"exec.go:66","message":"││└ # D2 [358 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.791978","line":"exec.go:66","message":"│├┬ D2: \u003cdict object at 0x7fcaee86f200\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.792141","line":"exec.go:66","message":"││├┬ D2: \u003cdict object at 0x7fcaee800ac0\u003e"}
{"severity":"INFO","time":"2023/07/11 17:59:27.792287","line":"exec.go:66","message":"│││└ # D2 [2 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.792453","line":"exec.go:66","message":"││└ # D2 [34 B]"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805605","line":"exec.go:66","message":"Traceback (most recent call last):"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805641","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py\", line 246, in dumps"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805661","line":"exec.go:66","message":"    s = dill.dumps(o, byref=settings['dill_byref'])"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805675","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 263, in dumps"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805691","line":"exec.go:66","message":"    dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805707","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 235, in dump"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805722","line":"exec.go:66","message":"    Pickler(file, protocol, **_kwds).dump(obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805735","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 394, in dump"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805749","line":"exec.go:66","message":"    StockPickler.dump(self, obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805766","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 487, in dump"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805780","line":"exec.go:66","message":"    self.save(obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805792","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805813","line":"exec.go:66","message":"    StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805828","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 603, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805842","line":"exec.go:66","message":"    self.save_reduce(obj=obj, *rv)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805854","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 717, in save_reduce"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805870","line":"exec.go:66","message":"    save(state)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805883","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805897","line":"exec.go:66","message":"    StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805910","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805924","line":"exec.go:66","message":"    f(self, obj)  # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805937","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py\", line 216, in new_save_module_dict"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805953","line":"exec.go:66","message":"    return old_save_module_dict(pickler, obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805965","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 1186, in save_module_dict"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805980","line":"exec.go:66","message":"    StockPickler.save_dict(pickler, obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.805993","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 971, in save_dict"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806007","line":"exec.go:66","message":"    self._batch_setitems(obj.items())"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806019","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 997, in _batch_setitems"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806033","line":"exec.go:66","message":"    save(v)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806045","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806059","line":"exec.go:66","message":"    StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806072","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806087","line":"exec.go:66","message":"    f(self, obj)  # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806100","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 1824, in save_function"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806114","line":"exec.go:66","message":"    _save_with_postproc(pickler, (_create_function, ("}
{"severity":"INFO","time":"2023/07/11 17:59:27.806127","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 1089, in _save_with_postproc"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806141","line":"exec.go:66","message":"    pickler.save_reduce(*reduction)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806153","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 691, in save_reduce"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806167","line":"exec.go:66","message":"    save(func)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806178","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806196","line":"exec.go:66","message":"    StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806211","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 603, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806224","line":"exec.go:66","message":"    self.save_reduce(obj=obj, *rv)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806236","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 692, in save_reduce"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806251","line":"exec.go:66","message":"    save(args)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806263","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806278","line":"exec.go:66","message":"    StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806291","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806304","line":"exec.go:66","message":"    f(self, obj)  # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806325","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 886, in save_tuple"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806351","line":"exec.go:66","message":"    save(element)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806363","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/dill/_dill.py\", line 388, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806378","line":"exec.go:66","message":"    StockPickler.save(self, obj, save_persistent_id)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806391","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/pickle.py\", line 560, in save"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806405","line":"exec.go:66","message":"    f(self, obj)  # Call unbound method with explicit self"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806418","line":"exec.go:66","message":"  File \"/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py\", line 170, in save_module"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806464","line":"exec.go:66","message":"    dill.dill.log.info('M2: %s' % obj)"}
{"severity":"INFO","time":"2023/07/11 17:59:27.806477","line":"exec.go:66","message":"AttributeError: module 'dill._dill' has no attribute 'log'"}

我不确定该去哪里寻找。看起来 dillbeam-sdk 最近没有发布新版本。几天前我尝试了一下,效果很好,所以我不确定到底应该在哪里查看。序列化数据流管道似乎存在问题。

python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

事实证明,我的管道中的某些内容无法通过

dill
序列化(不确定到底是哪一部分)。在深入研究并阅读this之后,我能够让我的管道运行,将序列化库从
dill
更改为
cloudpickle

    pipeline_options = PipelineOptions(
        pipeline_args,
        streaming=True,
        save_main_session=True,
        sdk_location="container",
        pickle_library="cloudpickle",
    )

需要注意的是,尽管

cloudpickle
dill
更具表现力,但它仍然被认为是实验性的(问题是这里)。我希望
cloudpickle
将成为默认的序列化库,但如果不是,那么
dill
版本控制应与 Beam SDK 保持一致(here)。感谢XQ Hu指出了这一点。

© www.soinside.com 2019 - 2024. All rights reserved.