apache beam 和 Big Query TableSchema 中的序列化问题

问题描述 投票:0回答:1

感谢您的支持。

我目前正在尝试 Apache Beam,以尽可能多地了解它的工作原理。我面临 com.google.api.services.bigquery.model.TableSchema 序列化的问题。我读到不可能序列化 TableSchema,并且我正在探索应对这一挑战的潜在解决方法。到目前为止,我确实有我的超类:

public class TransformBasic extends DoFn<String, TableRow>  {

private static final Logger LOG = LoggerFactory.getLogger(TransformBasic.class);
protected final TupleTag<TableRow> outputTag;
protected final TupleTag<TableRow> failureTag;

private transient com.google.api.services.bigquery.model.TableSchema tableSchema;
private String whoAmI;
private String header;




public TransformBasic(String whoAmI, String header, TupleTag<TableRow> outputTag, TupleTag<TableRow> failureTag) {

    this.whoAmI = whoAmI;
    this.header = header;
    this.outputTag = outputTag;
    this.failureTag = failureTag;
}

public TransformBasic() {
    this.outputTag = null;
    this.failureTag = null;
}


@ProcessElement
public void processElement(ProcessContext c) {
     tableSchema = getTableSchemaByName();

    //Csv line
    String line = c.element();

    //BigQuery row
    TableRow row = new TableRow();

    //Csv parser
    CSVParser csvParser = new CSVParserBuilder()
            .withSeparator(',')
            .build();

    try {
        //If csv line is different from header
        if (!line.equalsIgnoreCase(header)) {
            String[] csvColumns = csvParser.parseLine(line);
            for (int i = 0; i < csvColumns.length; i++) {
                TableFieldSchema bigqueryColumn = tableSchema.getFields().get(i);
                String newBigqueryColumns = replaceCharacters(bigqueryColumn.getName());
                row.set(newBigqueryColumns, csvColumns[i]);
            }
            c.output(outputTag,row);
        }

    } catch (Exception e) {
        LOG.error("FAILURE in " + whoAmI.toUpperCase() + e);

        Failure failure = new Failure(
                LocalDate.now().toString(),
                whoAmI,
                line,
                e.toString());

        c.output(failureTag, failure.getAsTableRow());
    }


}

     
//****************************
// HERE IS THE SERIALIZATION WORKAROUND SOLUTION ( NOT WORKING ) ----------------------------
//****************************
protected TableSchema getTableSchemaByName() {

    // Use the whoAmI variable to determine the schema
    if ("L".equals(whoAmI)) {
        return getTableSchemaL();
    } else if ("P".equals(whoAmI)) {
        return getTableSchemaP();
    } else {
        // Handle other cases or throw an exception
        throw new RuntimeException("Unknown class: " + whoAmI);
    }
}}

然后我创建子类:

public class TransformL extends TransformBasic  {


public static final  TupleTag<TableRow> OUTPUT_TAGS_L=new TupleTag<TableRow>() {};
public static final  TupleTag<TableRow> FAILURE_TAGS_L = new TupleTag<TableRow>() {};
private static final String WHO_AM_I = "l";
private static final String HEADER_L = "metric,scen,sec,year,value,id";

public TransformL() {
    super(WHO_AM_I, HEADER_L,OUTPUT_TAGS_L,FAILURE_TAGS_L);

}}

我使用的表模式是:

    public class TableSchema {
   public static com.google.api.services.bigquery.model.TableSchema getTableSchemaL() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("metric"));
        fields.add(new TableFieldSchema().setName("scen"));
        fields.add(new TableFieldSchema().setName("sec"));
        fields.add(new TableFieldSchema().setName("year"));
        fields.add(new TableFieldSchema().setName("value"));
        fields.add(new TableFieldSchema().setName("id"));

        return new com.google.api.services.bigquery.model.TableSchema().setFields(fields);
    }
}

我正在像这样测试这个实现:

public class TransformLitUwTest {

    private static final Logger LOG = LoggerFactory.getLogger(TransformLitUwTest.class);


    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
      public void testTransformL() {
    
    
            // Create a sample input data
            String fakeInput = "metric,scen,sec,year,value,id\n" +
                    "metric1,scen1,10,2022,100,id1\n" +
                    "metric2,scen2,20,2022,200,id2";
            PCollection<String> inputCollection = pipeline
                    .apply("CreateFakeInput", org.apache.beam.sdk.transforms.Create.of(fakeInput));
    
    
            // Apply your TransformLitUw transformation
            PCollectionTuple outputTuple = inputCollection.apply("TransformL", ParDo.of(new 
            TransformL())
                    .withOutputTags(TransformLitUw.OUTPUT_TAGS_L, 
            TupleTagList.of(TransformLitUw.FAILURE_TAGS_L)));
    
            // Get the PCollection with successful outputs
            PCollection<TableRow> outputCollection = 
            outputTuple.get(TransformLitUw.OUTPUT_TAGS_L);
            
            // Log output of PCollection
            outputCollection.apply(ParDo.of(new DoFn<TableRow, Void>() {
              @ProcessElement
              public void processElement(@Element TableRow c) {
                // Print each TableRow to the console
                LOG.info("TABLE ROW -------------------------------------> " + c.toString());
              }
            }));
    
            //Logic for asset equal (implementation omitted)
    
            // Run the pipeline
            pipeline.run().waitUntilFinish();
        }
}

为了清楚起见,我删除了一些代码,但如果有必要,我将包含完整的代码。

最初的解决方法涉及在父级中创建一个类,该类将调用 TableSchema 并在父级方法中使用它。然而,这种方法仍然不起作用,并且调试过程没有提供足够清晰的信息来说明应该在哪里进行更改。 我想保留拥有一个超类的想法,这对于我拥有的表模式来说是不必要的,但我也想尝试继承方法。 我有什么遗漏的吗?

csv serialization google-bigquery pipeline apache-beam
1个回答
0
投票

我对序列化做了更多研究,并修复了控制台中出现的错误。基本上,我还需要在测试类中包含序列化的实现。这是因为,根据我实现类和各种管道的方式,我还必须在测试类级别传递 tableSchema。所以,我改变了班级如下

public class TransformLitUwTest implements Serializable {
             // the rest of the implementation goes here
}

在实现过程中,此时我还发现了另一个错误::

    // Apply TransformLitUw transformation
    PCollectionTuple outputTuple = inputCollection.apply("TransformLitUw", ParDo.of(new TransformLitUw())
            .withOutputTags(TransformLitUw.OUTPUT_TAGS_LIT_UW, TupleTagList.of(TransformLitUw.FAILURE_TAGS_LIT_UW)));

在这种情况下,因为 TransformLitUw() 的 DoFn 接收一个 String 并返回 TableSchema,所以我必须通过以下方式将“假 CSV”读取为字符串数组:

        PCollection<String> inputCollection = pipeline
            .apply("CreateFakeInput", org.apache.beam.sdk.transforms.Create.of(Arrays.asList(fakeInput.split("\n"))));

我希望这些信息能够帮助面临序列化问题的人,并指导他们如何解决它

© www.soinside.com 2019 - 2024. All rights reserved.