我正在研究Dataflow,我已经通过Python SDK建立了我的自定义管道。我想在Dataflow UI中使用附加参数添加参数到我的自定义管道中。参考者 https:/cloud.google.comdataflowdocsguidestemplatescreating-templates#staticvalue。
然后我就换了 add_argument
到 add_value_provider_argument
遵循谷歌文档
class CustomParams(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--input_topic",
type = str,
)
parser.add_value_provider_argument(
"--window_size",
type = int,
default = 5,
)
def run():
pipeline_options = PipelineOptions(pipeline_args, .....)
custom_param = pipeline_options.view_as(CustomParams)
.....
pipeline | "Read PubSub Message" >> beam.io.ReadFromPubSub(custom_param.input_topic)
之后,我尝试制作一个模板到GCP。上传的脚本是这样的
python custom_pipeline.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--staging_location gs://YOUR_BUCKET_NAME/staging \
--temp_location gs://YOUR_BUCKET_NAME/temp \
--template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME
但是当我创建模板上传到GCS时,我得到了一个错误,就像这样。
TypeError: expected string or bytes-like object
在行 beam.io.ReadFromPubSub()
它看起来就像我从... add_value_provider_argument
是 RuntimeValueProvider 对象。 所以我很困惑我应该怎么做才能解决这个问题?
我尝试修复这个问题,比如
铸造数据类型
beam.io.ReadFromPubSub(str(custom_param.input_topic))
但得到了这个错误。
ValueError: PubSub topic must be in the form "projects/<project>/topics/<topic>" (got "RuntimeValueProvider(option: input_topic, type: str, default_value: '...')").
所以,请谁有故障排除这个?我不知道如何去没有它。
正如 @mk_sta 所述
似乎ReadFromPubSub模块不接受ValueProvider。你有没有检查这个Stack线程?
并在其中解释了 纫, ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.
您可以查看 接受运行时参数的IO方法 对于 ValueProvider
在不同SDK中的支持。
所以此刻,如果你从Python SDK切换到Java SDK,就会出现 阅览 的PubSubIO确实支持ValueProvider。