从消息负载中记录 Kafka Connect 中的特定属性

问题描述 投票:0回答:0

我设置了 Kafka 连接监听消息并将它们传递给其他主题。所有消息都是 json 格式,在这些消息 json 有效负载中,您可以看到需要记录在 kafka 连接日志中的 Id,以跟踪进出 kafka 连接的消息。

我添加了一些在这个问题的上下文中可能很重要的配置: standaloneWorkerConfig:

    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false

连接器配置:

connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
max.poll.records=5

我试着给kafka connect config添加transfomer:

transfroms: logId
transforms.logId.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.logId.field=message.header.id (the json path to id element that i need to log)

然后当我使用 log4j 进行日志记录时,我添加了转换模式:

    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m Id=%X{logId} (%c:%L)%n

正如您所猜到的,它不起作用,在我可以看到的每个日志中都记录了“Id=”,但没有返回 id 本身。此外,kafka 连接本身并没有传递消息。

logging apache-kafka apache-kafka-connect trace
© www.soinside.com 2019 - 2024. All rights reserved.