无法转换包含类型Map的PCollection - “错误的参数个数”

问题描述 投票:0回答:1

我正在尝试使用ParDo将变换应用于Apache梁中的Map对象的PCollection。我指定Map作为第一个DoFn的输出类型,并指定Map作为下一个DoFn的输入类型。但是,我收到以下错误:

[错误]无法在项目word-count-beam上执行目标org.codehaus.mojo:exec-maven-plugin:1.6.0:java(default-cli):执行Java类时发生异常。调用Coder工厂方法时出错public static org.apache.beam.sdk.coders.MapCoder org.apache.beam.sdk.coders.MapCoder.of(org.apache.beam.sdk.coders.Coder,org.apache.beam .sdk.coders.Coder):错误的参数数量 - > [帮助1]

这是我的代码:

  static class GetProductName extends DoFn<String, Map> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      {...}
      Map<String, String> properties = new HashMap<>();
      properties.put("name", "test_name");
      properties.put("sku", "test_sku");

      // Use ProcessContext.output to emit the output element.
      c.output(properties);
    }
  }

  static class FormatForDatastore extends DoFn<Map, Entity> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Map<String, String> product = new HashMap<>();
      product = c.element();
    {...}
    }
  }

  public static void main(String[] args) {
    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // apply transforms
    p.apply("ReadJSONLines", TextIO.read().from(options.getInputFile()))
     .apply(ParDo.of(new GetProductName()))
     .apply(ParDo.of(new FormatForDatastore()))
     {...}
   }

任何想法是什么问题?

我不确定如何解释错误。我做了一些测试,通过编辑代码来转换类型为String的PCollection而不是Map类型,它可以正常工作。但是,我想在这些转换之间传递多个键值对。欢迎提出替代方案的建议。

google-cloud-dataflow apache-beam
1个回答
0
投票

您正在使用原始类型Map。尝试使用正确的通用类型Map<String,String>

© www.soinside.com 2019 - 2024. All rights reserved.