Kafka Connect FileStreamSink 连接器删除 JSON 消息的引号并将冒号更改为等号

问题描述 投票:0回答:2

总结

当我与控制台制作人一起传输此内容时

{"id":1337,"status":"example_topic_1 success"}

我从我的文件流消费者那里得到这个

/数据/example_topic_1.txt

{id=1337, status=example_topic_1 success}

这对我来说是一个主要问题,因为如果不假设引号曾经在哪里,就无法恢复原始 JSON 消息。如何将消息输出到文件,同时保留引号?

详情

  1. 首先,我启动文件接收器连接器。
    # sh bin/connect-standalone.sh \
    >   config/worker.properties \
    >   config/connect-file-sink-example_topic_1.properties
    
  2. 其次,我启动控制台消费者(也内置于 Kafka),以便我可以轻松地通过视觉确认消息是否正确传送。
    # sh bin/kafka-console-consumer.sh \
    >   --bootstrap-server kafka_broker:9092 \
    >   --topic example_topic_1
    
  3. 最后,我启动一个控制台生成器来发送消息,然后输入一条消息。

    # sh bin/kafka-console-producer.sh \
    >   --broker-list kafka_broker:9092 \
    >   --topic example_topic_1
    

    从控制台消费者,消息正确弹出,并带有引号。

    {"id":1337,"status":"example_topic_1 success"}
    

    但是我从 FileStreamSink 消费者那里得到了这个:

    /数据/example_topic_1.txt

    {id=1337, status=example_topic_1 success}
    

我的配置

配置/worker.properties

offset.storage.file.filename=/tmp/example.offsets

bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

config/connect-file-sink-example_topic_1.properties

name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1
apache-kafka apache-kafka-connect
2个回答
4
投票

由于您实际上并不想解析 JSON 数据,而只是将其作为文本块直接传递,因此您需要使用 StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

本文详细解释了转换器的细微差别:https://rmoff.net/2019/05/08/when-a-kafka-connect-converter-is-not-a-converter/。这显示了您尝试执行的操作的示例,尽管使用

kafkacat
代替控制台生产者/消费者。


0
投票

我在尝试解决类似问题时遇到了这个问题,虽然它很旧,但我想发布我的答案:

我的解决方案涉及创建我自己的

org.apache.kafka.connect.sink.SinkTask
实现。在处理
Sink
集合的
org.apache.kafka.connect.sink.SinkRecord
中,我使用 Jackson
Map
转换
JsonConverter
ObjectMapper
正在输出):

因此 connect-standalone.properties 包含以下内容:

value.converter=org.apache.kafka.connect.json.JsonConverter

然后在我的类中扩展了 SinkTask:

for (SinkRecord record : records) {
    Object recordValue = record.value();

    if (recordValue instanceof Map) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            recordValue = objectMapper.writeValueAsString(recordValue);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

这在您的示例中意味着:

{"id":1337,"status":"example_topic_1 success"}

相应地重新格式化为有效的 json 字符串 - 这意味着您仍然可以使用 JsonConverter,这意味着您仍然可以在转换中处理 JSON。我正在使用

org.apache.kafka.connect.transforms.ExtractField$Value
来提取 json 的一部分作为值。

© www.soinside.com 2019 - 2024. All rights reserved.