不知道DeserializationSchema参数应该填什么 谁能帮帮我?
PubSubSource<RecordArrayInfo> source = PubSubSource.newBuilder()
.withDeserializationSchema(*WhatSchema(?)*)
.withProjectName(projectId)
.withSubscriptionName(subscriptionName)
.withCredentials(ServiceAccountCredentials.fromStream(getAutheficateFile()))
.build();
我发现有一个名为PubSubDeserializationSchema的接口,也许我可以写一个类实现这个接口可以填补上面的空白?
The deserialization schema describes how to turn the PubsubMessages into data types (Java/Scala objects) that are processed by Flink.
Type parameters:
<T> – The type created by the deserialization schema.
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {}
但是不知道参数'T'应该填什么,'类型参数: – 反序列化模式创建的类型。 这个 T 模式是发布/订阅消息模式还是用户数据模式? 我觉得可能是前面那个,因为用户数据schema不能直接读取,所以加密了。 所以如果它是发布/订阅消息模式,也许它是我不需要自己编写的常见模式? 它是什么? 非常感谢您的回答。
反序列化始终是一种获取“在线”数据(在您的情况下为 PubSub 消息)并将其转换为所需格式/表示的操作。我们可以在 JavaDocs 中看到。有一个方法需要重写:
T deserialize(com.google.pubsub.v1.PubsubMessage message) throws Exception
如您所见,它返回类型参数
T
,这是您要在应用程序中使用的类型。它接受 PubsubMessage 并且由您来实现将 PubsubMessage 转换为所需类型的逻辑。
例如:如果您将 JSON 对象存储在您的 PubSub 中,那么您可以将字节读取为 UTF-8 字符串并使用 Jackson 将其反序列化为所需的对象。您需要在实现
PubSubDeserializationSchema
. 的类中实现它
在您的情况下,您已经暗示了
T
应该是什么。你有 PubSubSource<RecordArrayInfo>
其中 RecordArrayInfo
是结果流的元素类型。在这种情况下,您需要实现这样的类:
class RecordArrayInfoDeserializer implements PubSubDeserializationSchema<RecordArrayInfo>
并覆盖所有必需的方法。
当你实现这个时,你可以像这样构建你的源代码:
PubSubSource<RecordArrayInfo> source = PubSubSource.newBuilder()
.withDeserializationSchema(new RecordArrayInfoDeserializer())
.withProjectName(projectId)
.withSubscriptionName(subscriptionName)
.withCredentials(ServiceAccountCredentials.fromStream(getAutheficateFile()))
.build();