我今天正在为我正在从事的 Flink 工作寻求一些建议(希望是一个解决方案)。作业本身从 Kafka 主题读取 JSON 字符串,并将其读入下面
JsonObject
中的 SanitizeFunction()
实例,然后在整个管道中对其进行操作,最终再次写入 Kafka:
stream
.fromSource(
KafkaSource.builder<String>()
...
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer::class.java)),
WatermarkStrategy.noWatermarks(),
"incoming-messages-from-kafka",
TypeInformation.of(String::class.java)
)
.process(SanitizeFunction())
...
.process(MoreProcessOperationsAgainstJsonObjects())
...
.sinkTo(...)
这里的问题是数据的形状可以剧烈和动态变化。某些记录可能具有仅该记录独有的属性,这使得定义 POJO 变得困难。除此之外,
JsonObject
退回到 Kryo 序列化,这导致了可怕的吞吐量。
我正在寻找 some 方法来读取这些 JSON 字符串,丰富这些对象的属性,并最终将它们写入各种接收器。是否有某种类型的基于 JSON 的类或库或我可以用来以有效方式完成此任务的方法?或者,如果可能有一种方法来部分编写 POJO,允许我与 JSON 的部分/属性进行交互,同时保留可能动态存在或消息独有的其他属性?
考虑的一些想法/尝试:
欢迎任何意见或建议!如果有帮助,我也很乐意提供任何其他背景信息!