当我想要使用的模式数据是 pcollection 本身的一部分时,如何在 PcollectionTuple 中为 Pcollection 设置编码器

问题描述 投票:0回答:2

我正在尝试使用动态模式处理数据事件(来自 pubsub 的流数据)。架构可以更改,并且我正在以架构 id 键控的架构注册表中维护架构。

我将把架构注册表作为旁输入传递给 pardo 转换,该转换将处理我的数据并创建事件类型的 pcollection 并将其添加到元组中。

我处理的事件数据应有一个 schemaid,可用于键入 schema 注册表并序列化数据 (jsontoRow)。我还将 schema-id 与数据一起使用。

PCollectionTuple output = input.apply("FromJson", ParDo.of(
                    new DoFn<String, EventWrapper<String, RowWrapper>>() {

                        @ProcessElement
                        public void processElement(ProcessContext context, @Element String inputRecord,
                                                   MultiOutputReceiver out) {
                      
                                Map<String, Map<String, Schema>> schemaMap = context.sideInput(schemaMapView);
                                
                                    JSONObject obj = new JSONObject(inputRecord);
                                    String schemaIdStr =
                                            obj.getJSONObject(Schema).getString("schemaid");


                                    Schema inputSchema = schemaMap.get(schemaIdStr);
                                    Row row = jsonToRow(newObjectMapperWith(
                                            RowJsonDeserializer.forSchema(inputSchema)),
                                            obj.getJSONObject("data").getJSONObject("payload").toString());

                                    String eventName = obj.getJSONObject("data").getJSONObject("payload").getJSONObject("event").toString();

                                    TupleTag<EventWrapper<String, RowWrapper>> tupleTagInUse = TupleTagGenerator.getTupleTag(eventName);
                                    if (tupleTagInUse == null) {
                                        tupleTagInUse = validEvents;
                                    }
                                    RowWrapper rowData = new RowWrapper(row,schemaIdStr);
                                    out.get(tupleTagInUse).output(new EventWrapper<>(inputRecord, rowData));
                          
                            } 
                        }
                    }).withSideInputs(schemaMapView)
                    .withOutputTags(validEvents, TupleTagList.of(invalidEvents)
                                    .and(new ArrayList<TupleTag<?>>(tupleTagList))));

我现在需要为从上面 pardo 创建的每个 pCollection 设置编码器。我无权访问上下文之外的架构图。但我确实在 rowWrapper 中存储了 schema-id,并且 Beam row 应该维护它用于序列化数据的架构。

Q1:我可以在 pcollectionTuple 上运行一些转换来处理每个 pcollection,以从 pcollection 中存储的行数据中提取模式并在 pcollection 对象上执行 setcoder 吗?

Q2:我可以在 parDO 中创建一个侧面输出、一个键值对或哈希映射来维护架构 ID 到架构映射,以便我可以在 pardo 外部使用该映射吗?

Q3:我可以为在 pardo 中创建的 Pcollection 设置编码器,其中我有可用的架构对象吗?

java serialization apache-beam
2个回答
0
投票

编码器(及其关联的模式)必须作为管道构建的一部分进行设置,并且不能依赖于数据本身(数据仅在管道完全定义、优化并发送执行后才存在)。

如果您需要这样的动态数据,则需要使用某种容器类型,其 Coder 可以处理动态字段。 (由于您已经在使用 JSON,因此您可以考虑继续对动态架构数据执行此操作。)


0
投票

@user18207952 我面临类似的问题。您能分享一下这个问题是如何解决的吗?谢谢

© www.soinside.com 2019 - 2024. All rights reserved.