我想在 Beam PCollections 中使用旧版 API 中的 Java 类。我专门与 PCollection of Row 合作。我无法访问旧版 API 的源代码,因此无法对其进行注释或声明它们可序列化。它的类一般不实现Serialized。我收到的错误通常表明有问题的遗留类无法序列化。
我已经为遗留类创建了逻辑类型。 Beam 似乎接受它们作为 Row 组件,并且我可以创建和打印 PCollections,包括遗留类。然而,当我尝试转换 PCollections 时,我通常会遇到不可序列化的异常。
我需要使用的转换包括 AddFields、Group.byFieldNames.aggregateField 和 Join。现在,当我尝试使用遗留类(链中的第一个转换)添加字段时,我遇到了不可序列化的异常。
我已在管道中注册了包含旧组件的架构。我使用 ObjectInput/OutputStream 类的 readObject 和 writeObject 方法为遗留类创建了自定义编码器,并将它们注册到管道中。
对于如何处理 Beam 中的遗留类,有什么一般性建议吗?欢迎提供示例。
谢谢。
您提到您已经为遗留类创建了自定义编码器。这是一个好方法。创建自定义编码器时,您应该确保正确处理遗留类的序列化和反序列化。您可以通过扩展
Coder<T\>
并覆盖 encode
和 decode
方法来实现自定义编码器。
遗留类的自定义编码器示例:
public class LegacyClassCoder extends CustomCoder<LegacyClass> {
@Override
public void encode(LegacyClass value, OutputStream outStream) {
// Serialize the LegacyClass to bytes and write them to outStream
}
@Override
public LegacyClass decode(InputStream inStream) {
// Read bytes from inStream and deserialize them to a LegacyClass instance
}
}
您表示您已经为遗留类设计了逻辑类型,这有利于封装序列化逻辑并简化与这些遗留类的交互。此外,beam 允许使用这些逻辑类型作为 Row PCollection 中的元素。如果逻辑类型有效地管理序列化和反序列化过程,这种方法是合理的。
使用
Group.byFieldNames.aggregateField
或 Join
等操作时,避免使用旧类作为关键字段。相反,创建使用可序列化类型的中间键。