我设置了 Kafka 连接监听消息并将它们传递给其他主题。所有消息都是 json 格式,在这些消息 json 有效负载中,您可以看到需要记录在 kafka 连接日志中的 Id,以跟踪进出 kafka 连接的消息。
我添加了一些在这个问题的上下文中可能很重要的配置: standaloneWorkerConfig:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
连接器配置:
connector.class=com.ibm.eventstreams.connect.mqsource.MQSourceConnector
max.poll.records=5
我试着给kafka connect config添加transfomer:
transfroms: logId
transforms.logId.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.logId.field=message.header.id (the json path to id element that i need to log)
然后当我使用 log4j 进行日志记录时,我添加了转换模式:
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m Id=%X{logId} (%c:%L)%n
正如您所猜到的,它不起作用,在我可以看到的每个日志中都记录了“Id=”,但没有返回 id 本身。此外,kafka 连接本身并没有传递消息。