我无法通过 telegraf 看到从 Kafka 到 influxDB 的流数据

问题描述 投票:0回答:1

我刚刚开始使用由一系列开源软件组成的堆栈:KAFKA --> Telegraf --> InfluxDB --> Grafana 进行数据分析。

链是在 docker compose 文件中创建和配置的。输入数据使用 Python 脚本随机传输。

导航到 kafkadrop (localhost:9000) 时,我可以看到流数据划分为预定义的主题。不幸的是,当登录 influxDB 时,我无法按预期看到数据,并且仅存在存储桶的名称。

我已经附上了docker配置,环境变量文件。

docker 编写 yaml:

version: "3.6"
services:

    zookeeper:
        image: bitnami/zookeeper:latest
        container_name: zookeeper
        ports:
        - '2181:2181'
        environment:
        - ALLOW_ANONYMOUS_LOGIN=yes
        networks:
        - kafka
    
    kafka1:
        #using the version 3.3.1 is because of an strange bug in one of the latest versions of kafka that comes from a hardware or OS incompatibility. you can try sometimes later with newest version to be sure that if there is no problem then back to the latest version.
        image: bitnami/kafka:3.3.1
        container_name: kafka1
        ports:
        - '9093:9093'
        - '9092:9092'
        environment:
        - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
        - ALLOW_PLAINTEXT_LISTENER=yes
        - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
        - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
        - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:9092,EXTERNAL://localhost:9093,
        - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
        - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
        depends_on:
        - zookeeper
        networks:
        - kafka
    
    kafdrop:
        image: obsidiandynamics/kafdrop:latest
        container_name: kafdrop
        ports:
        - 9000:9000
        environment:
        - KAFKA_BROKERCONNECT=kafka1:9092
        depends_on:
        - kafka1
        networks:
        - kafka
    
    ### DATABASE
    influxdb:
        image: influxdb:latest
        container_name: influxdb
        ports:
            - "8083:8083"
            - "8086:8086"
            - "8090:8090"
        depends_on:
            - zookeeper
            - kafka1
        environment: # parameter in variables.env
            DOCKER_INFLUXDB_INIT_MODE: ${DOCKER_INFLUXDB_INIT_MODE}
            DOCKER_INFLUXDB_INIT_USERNAME: ${DOCKER_INFLUXDB_INIT_USERNAME}
            DOCKER_INFLUXDB_INIT_PASSWORD: ${DOCKER_INFLUXDB_INIT_PASSWORD}
            DOCKER_INFLUXDB_INIT_ORG: ${DOCKER_INFLUXDB_INIT_ORG}
            DOCKER_INFLUXDB_INIT_BUCKET: ${DOCKER_INFLUXDB_INIT_BUCKET}
            DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${influxdb_token}
        networks:
          - db
        volumes:
            - ./data/influxdb/:/var/lib/influxdb
    
    telegraf:
        image: telegraf:latest
        container_name: telegraf
        environment: # configuration in /telegraf/telegraf.conf
            influxdb_token: ${influxdb_token} # parameter in variables.env
        depends_on:
            - zookeeper
            - influxdb
            - kafka1
        restart: unless-stopped
        networks:
          - kafka
          - db
        volumes:
            - ./conf/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
    
    grafana:
        image: grafana/grafana  
        container_name: grafana
        environment:
            - GF_SECURITY_ADMIN_USERNAME=admin
            - GF_SECURITY_ADMIN_PASSWORD=1234
        ports:
            - "3000:3000"
        #  volumes: 
        #      - ./grafana/data:/var/lib/grafana
        networks:
            - grafana
            - db
            - line
        depends_on:
            - telegraf
    
    # temperature-producer:
    #     build:
    #       context: ./DataSender
    #       dockerfile: dockerfile
    #     container_name: temperature-producer
    #     depends_on:
    #       - kafka1
    #     networks:
    #       - kafka
         
    ### NETWORKS

networks:
kafka: # connection to kafka
name: kafka
driver: bridge
line: # connection to linesender
name: line
driver: bridge
db: # connections to the database
name: db
driver: bridge
grafana:
name: grafana
driver: bridge

环境变量:

DOCKER_INFLUXDB_INIT_MODE=setup DOCKER_INFLUXDB_INIT_USERNAME=influx-admin DOCKER_INFLUXDB_INIT_PASSWORD=ThisIsNotThePasswordYouAreLookingFor DOCKER_INFLUXDB_INIT_ORG=ORG DOCKER_INFLUXDB_INIT_BUCKET=system_state influxdb_token=random_token`

这是运行 docker compose 后显示的错误: docker compose errors

我仍然不明白为什么数据没有显示在 influxDB 中(没有找到标签键)

InfluxDB UI

我想查看 influxDB 中的数据。

apache-kafka influxdb influxdb-2 telegraf-plugins
1个回答
0
投票
    # Global tags can be specified here in key="value" format.
[global_tags]
  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"


# Configuration for telegraf agent
[agent]
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at
  ## most metric_batch_size metrics.
  metric_batch_size = 1000
  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
  ## output, and will flush this buffer on a successful write. Oldest metrics
  ## are dropped first when this buffer fills.
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Default flushing interval for all outputs. You shouldn't set this below
  ## interval. Maximum flush_interval will be flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## By default, precision will be set to the same timestamp order as the
  ## collection interval, with the maximum being 1s.
  ## Precision will NOT be used for service inputs, such as logparser and statsd.
  ## Valid values are "ns", "us" (or "µs"), "ms", "s".
  precision = ""
  ## Run telegraf in debug mode
  debug = false
  ## Run telegraf in quiet mode
  quiet = false
  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false


###############################################################################
#                            OUTPUT PLUGINS                                   #
###############################################################################

# Configuration for influxdb server to send metrics to
[[outputs.influxdb_v2]]
  ## The full HTTP or UDP endpoint URL for your InfluxDB instance.
  ## Multiple urls can be specified as part of the same cluster,
  ## this means that only ONE of the urls will be written to each interval.
  # urls = ["udp://localhost:8089"] # UDP endpoint example
  urls = ["http://influxdb:8086"] ## Docker-Compose internal address
  token = "random_token" ## token name, setting from config not working
  organization = "ORG" ## orga name, setting from config not working
  bucket = "system_state" ## bucket name / db name, setting from config not working
  #database = "system_state"


  ## Write timeout (for the InfluxDB client), formatted as a string.
  ## If not provided, will default to 5s. 0s means no timeout (not recommended).
  timeout = "10s"
 




###############################################################################
#                            SERVICE INPUT PLUGINS                            #
###############################################################################

[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["kafka1:9092", "localhost:9093"] ## docker-compose internal address of kakfa
  
  ## Topics to consume.
  topics = [ "states"] ## topic to subscribe to

  ## When set this tag will be added to all metrics with the topic as the value.
  #topic_tag = "kafka"

  ## Optional Client id
  client_id = "kti_state" ## "username" of telegraf for kafka

  ## Set the minimal supported Kafka version.  Setting this enables the use of new
  ## Kafka features and APIs.  Must be 0.10.2.0 or greater.
  ##   ex: version = "1.1.0"
  # version = ""

  ## Optional TLS Config
  # tls_ca = "/etc/telegraf/ca.pem"
  # tls_cert = "/etc/telegraf/cert.pem"
  # tls_key = "/etc/telegraf/key.pem"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## SASL authentication credentials.  These settings should typically be used
  ## with TLS encryption enabled using the "enable_tls" option.
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## SASL protocol version.  When connecting to Azure EventHub set to 0.
  # sasl_version = 1

  ## Name of the consumer group.
  # consumer_group = "telegraf_metrics_consumers"

  ## Initial offset position; one of "oldest" or "newest".
  # offset = "oldest"

  ## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
  # balance_strategy = "range"

  ## Maximum length of a message to consume, in bytes (default 0/unlimited);
  ## larger messages are dropped
  #max_message_len = 1000000

  ## Maximum messages to read from the broker that have not been written by an
  ## output.  For best throughput set based on the number of metrics within
  ## each message and the size of the output's metric_batch_size.
  ##
  ## For example, if each message from the queue contains 10 metrics and the
  ## output metric_batch_size is 1000, setting this to 100 will ensure that a
  ## full batch is collected and the write is triggered immediately without
  ## waiting until the next flush_interval.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"
  ## When strict is true and a JSON array is being parsed, all objects within the
  ## array must be valid
  json_strict = true

  ## Query is a GJSON path that specifies a specific chunk of JSON to be
  ## parsed, if not specified the whole document will be parsed.
  ##
  ## GJSON query paths are described here:
  ##   https://github.com/tidwall/gjson/tree/v1.3.0#path-syntax
  json_query = ""

  ## Tag keys is an array of keys that should be added as tags.  Matching keys
  ## are no longer saved as fields.
  tag_keys = []

  ## Array of glob pattern strings keys that should be added as string fields.
  json_string_fields = ["color"]

  ## Name key is the key to use as the measurement name.
  json_name_key = ""

  ## Time key is the key containing the time that should be used to create the
  ## metric.
  json_time_key = ""

  ## Time format is the time layout that should be used to interprete the json_time_key.
  ## The time must be `unix`, `unix_ms`, `unix_us`, `unix_ns`, or a time in the
  ## "reference time".  To define a different format, arrange the values from
  ## the "reference time" in the example to match the format you will be
  ## using.  For more information on the "reference time", visit
  ## https://golang.org/pkg/time/#Time.Format
  ##   ex: json_time_format = "Mon Jan 2 15:04:05 -0700 MST 2006"
  ##       json_time_format = "2006-01-02T15:04:05Z07:00"
  ##       json_time_format = "01/02/2006 15:04:05"
  ##       json_time_format = "unix"
  ##       json_time_format = "unix_ms"
  json_time_format = ""

  ## Timezone allows you to provide an override for timestamps that
  ## don't already include an offset
  ## e.g. 04/06/2016 12:41:45
  ##
  ## Default: "" which renders UTC
  ## Options are as follows:
  ##   1. Local               -- interpret based on machine localtime
  ##   2. "America/New_York"  -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
  ##   3. UTC                 -- or blank/unspecified, will return timestamp in UTC
  json_timezone = ""
© www.soinside.com 2019 - 2024. All rights reserved.