使用JSON字段值作为带有文件拍子的Kafka主题的记录键

问题描述 投票:0回答:1

我在文件中有一个事件(JSON消息),需要通过filebeat发送到Kafka。JSON消息如下所示:

{"time":1582213700.001,"interval":"2s","worker":11,"application":"1.1.1.1"}

我想将此邮件发送给Kafka。分区密钥应该是JSON事件消息中的application字段。如何在JSON消息中提供自定义应用程序字段作为Kafka记录的部分密钥?

像这样的filebeat.yml:

…
        output.kafka:
          version: 0.10.1
          hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
          topic: '%{[log_topic]}'
          codec.format:
            string: '%{[message]}'
          key: '%{[message.application]:default}'
          partition.hash:
            hash: []
            random: true # if false non-hashable events will be dropped
          required_acks: 1
          compression: none

https://www.elastic.co/guide/en/beats/libbeat/6.8/config-file-format-type.html#_format_string_sprintf根据此参考,我们可以使用格式字符串规范来引用事件字段值。

使用此配置,默认消息“默认”始终报告为密钥。如何配置filebeat.yml以提取自定义应用程序字段并将此信息用作Kafka分区密钥?

此外,我尝试如下在输入部分中定义一个文件:

type: log
  enabled: true
  paths:
  - /var/logs/*event.log
  fields:
    log_topic: "event"
    application: '%{[application]} string'
  fields_under_root: true

和相应的kafka输出为:

output.kafka:
  version: 0.10.1
  hosts: ["{KAFKA1}:9092", "{KAFKA2}:9092", "{KAFKA3}:9092"]
  topic: '%{[log_topic]}'
  codec.format:
    string: '%{[message]}'
  key: '%{[application]:default}'

  partition.hash:
    hash: []
    random: true # if false non-hashable events will be dropped
  required_acks: 1

但是然后,kafka分区密钥始终是:%{[application]}字符串

json apache-kafka elastic-stack filebeat elastic-beats
1个回答
0
投票

假设实际上已解析JSON,我想您想要的

key: '%{[fields.application]:default}'

请参见示例-https://www.elastic.co/guide/en/beats/filebeat/6.8/kafka-output.html#topic-option-kafka

您可能也有兴趣添加主机元数据-https://www.elastic.co/guide/en/beats/filebeat/6.8/add-host-metadata.html

和解码JSON-https://www.elastic.co/guide/en/beats/filebeat/6.8/decode-json-fields.html

© www.soinside.com 2019 - 2024. All rights reserved.