在 Flink 管道中高效处理 JSON 消息

问题描述 投票:0回答:0

我今天正在为我正在从事的 Flink 工作寻求一些建议(希望是一个解决方案)。作业本身从 Kafka 主题读取 JSON 字符串,并将其读入下面

JsonObject
中的
SanitizeFunction()
实例,然后在整个管道中对其进行操作,最终再次写入 Kafka:

stream
    .fromSource(
        KafkaSource.builder<String>()
           ...
           .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer::class.java)),
        WatermarkStrategy.noWatermarks(),
        "incoming-messages-from-kafka",
        TypeInformation.of(String::class.java)
    )
    .process(SanitizeFunction())
    ...
    .process(MoreProcessOperationsAgainstJsonObjects())
    ...
    .sinkTo(...)

这里的问题是数据的形状可以剧烈和动态变化。某些记录可能具有仅该记录独有的属性,这使得定义 POJO 变得困难。除此之外,

JsonObject
退回到 Kryo 序列化,这导致了可怕的吞吐量。

我正在寻找 some 方法来读取这些 JSON 字符串,丰富这些对象的属性,并最终将它们写入各种接收器。是否有某种类型的基于 JSON 的类或库或我可以用来以有效方式完成此任务的方法?或者,如果可能有一种方法来部分编写 POJO,允许我与 JSON 的部分/属性进行交互,同时保留可能动态存在或消息独有的其他属性?

考虑的一些想法/尝试:

  • 也许将 JSON 负载扁平化为一系列地图/嵌套地图?尽管由于数据变化,这可能很难确保正确保留打字等。
  • 某种形式的 partial 序列化,它将原始消息有效负载保留为字符串,并且仅序列化在管道期间“接触”的属性,然后在写入之前将其合并到最终更新的字符串中。
  • 纯粹使用字符串(这似乎是高效的,但从中读取值/进行更改几乎肯定是一场噩梦)。
  • 利用可能针对某些类似用例量身定制的其他一些受支持的数据结构/类。

欢迎任何意见或建议!如果有帮助,我也很乐意提供任何其他背景信息!

json performance serialization apache-flink flink-streaming
© www.soinside.com 2019 - 2024. All rights reserved.