如何在 PCollection 中使用遗留类?

问题描述 投票:0回答:1

我想在 Beam PCollections 中使用旧版 API 中的 Java 类。我专门与 PCollection of Row 合作。我无法访问旧版 API 的源代码,因此无法对其进行注释或声明它们可序列化。它的类一般不实现Serialized。我收到的错误通常表明有问题的遗留类无法序列化。

我已经为遗留类创建了逻辑类型。 Beam 似乎接受它们作为 Row 组件,并且我可以创建和打印 PCollections,包括遗留类。然而,当我尝试转换 PCollections 时,我通常会遇到不可序列化的异常。

我需要使用的转换包括 AddFields、Group.byFieldNames.aggregateField 和 Join。现在,当我尝试使用遗留类(链中的第一个转换)添加字段时,我遇到了不可序列化的异常。

我已在管道中注册了包含旧组件的架构。我使用 ObjectInput/OutputStream 类的 readObject 和 writeObject 方法为遗留类创建了自定义编码器,并将它们注册到管道中。

对于如何处理 Beam 中的遗留类,有什么一般性建议吗?欢迎提供示例。

谢谢。

java apache-beam
1个回答
0
投票

您提到您已经为遗留类创建了自定义编码器。这是一个好方法。创建自定义编码器时,您应该确保正确处理遗留类的序列化反序列化。您可以通过扩展

Coder<T\>
并覆盖
encode
decode
方法来实现自定义编码器。

遗留类的自定义编码器示例:

public class LegacyClassCoder extends CustomCoder<LegacyClass> {
    @Override
    public void encode(LegacyClass value, OutputStream outStream) {
    // Serialize the LegacyClass to bytes and write them to outStream
    }

    @Override
    public LegacyClass decode(InputStream inStream) {
    // Read bytes from inStream and deserialize them to a LegacyClass instance
    }
}

您表示您已经为遗留类设计了逻辑类型,这有利于封装序列化逻辑并简化与这些遗留类的交互。此外,beam 允许使用这些逻辑类型作为 Row PCollection 中的元素。如果逻辑类型有效地管理序列化和反序列化过程,这种方法是合理的。

使用

Group.byFieldNames.aggregateField
Join
等操作时,避免使用旧类作为关键字段。相反,创建使用可序列化类型的中间键。

© www.soinside.com 2019 - 2024. All rights reserved.