我正在尝试使用动态模式处理数据事件(来自 pubsub 的流数据)。架构可以更改,并且我正在以架构 id 键控的架构注册表中维护架构。
我将把架构注册表作为旁输入传递给 pardo 转换,该转换将处理我的数据并创建事件类型的 pcollection 并将其添加到元组中。
我处理的事件数据应有一个 schemaid,可用于键入 schema 注册表并序列化数据 (jsontoRow)。我还将 schema-id 与数据一起使用。
PCollectionTuple output = input.apply("FromJson", ParDo.of(
new DoFn<String, EventWrapper<String, RowWrapper>>() {
@ProcessElement
public void processElement(ProcessContext context, @Element String inputRecord,
MultiOutputReceiver out) {
Map<String, Map<String, Schema>> schemaMap = context.sideInput(schemaMapView);
JSONObject obj = new JSONObject(inputRecord);
String schemaIdStr =
obj.getJSONObject(Schema).getString("schemaid");
Schema inputSchema = schemaMap.get(schemaIdStr);
Row row = jsonToRow(newObjectMapperWith(
RowJsonDeserializer.forSchema(inputSchema)),
obj.getJSONObject("data").getJSONObject("payload").toString());
String eventName = obj.getJSONObject("data").getJSONObject("payload").getJSONObject("event").toString();
TupleTag<EventWrapper<String, RowWrapper>> tupleTagInUse = TupleTagGenerator.getTupleTag(eventName);
if (tupleTagInUse == null) {
tupleTagInUse = validEvents;
}
RowWrapper rowData = new RowWrapper(row,schemaIdStr);
out.get(tupleTagInUse).output(new EventWrapper<>(inputRecord, rowData));
}
}
}).withSideInputs(schemaMapView)
.withOutputTags(validEvents, TupleTagList.of(invalidEvents)
.and(new ArrayList<TupleTag<?>>(tupleTagList))));
我现在需要为从上面 pardo 创建的每个 pCollection 设置编码器。我无权访问上下文之外的架构图。但我确实在 rowWrapper 中存储了 schema-id,并且 Beam row 应该维护它用于序列化数据的架构。
Q1:我可以在 pcollectionTuple 上运行一些转换来处理每个 pcollection,以从 pcollection 中存储的行数据中提取模式并在 pcollection 对象上执行 setcoder 吗?
Q2:我可以在 parDO 中创建一个侧面输出、一个键值对或哈希映射来维护架构 ID 到架构映射,以便我可以在 pardo 外部使用该映射吗?
Q3:我可以为在 pardo 中创建的 Pcollection 设置编码器,其中我有可用的架构对象吗?
编码器(及其关联的模式)必须作为管道构建的一部分进行设置,并且不能依赖于数据本身(数据仅在管道完全定义、优化并发送执行后才存在)。
如果您需要这样的动态数据,则需要使用某种容器类型,其 Coder 可以处理动态字段。 (由于您已经在使用 JSON,因此您可以考虑继续对动态架构数据执行此操作。)
@user18207952 我面临类似的问题。您能分享一下这个问题是如何解决的吗?谢谢