我刚刚开始使用由一系列开源软件组成的堆栈:KAFKA --> Telegraf --> InfluxDB --> Grafana 进行数据分析。
链是在 docker compose 文件中创建和配置的。输入数据使用 Python 脚本随机传输。
导航到 kafkadrop (localhost:9000) 时,我可以看到流数据划分为预定义的主题。不幸的是,当登录 influxDB 时,我无法按预期看到数据,并且仅存在存储桶的名称。
我已经附上了docker配置,环境变量文件。
docker 编写 yaml:
version: "3.6"
services:
zookeeper:
image: bitnami/zookeeper:latest
container_name: zookeeper
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka
kafka1:
#using the version 3.3.1 is because of an strange bug in one of the latest versions of kafka that comes from a hardware or OS incompatibility. you can try sometimes later with newest version to be sure that if there is no problem then back to the latest version.
image: bitnami/kafka:3.3.1
container_name: kafka1
ports:
- '9093:9093'
- '9092:9092'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka1:9092,EXTERNAL://localhost:9093,
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
depends_on:
- zookeeper
networks:
- kafka
kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: kafdrop
ports:
- 9000:9000
environment:
- KAFKA_BROKERCONNECT=kafka1:9092
depends_on:
- kafka1
networks:
- kafka
### DATABASE
influxdb:
image: influxdb:latest
container_name: influxdb
ports:
- "8083:8083"
- "8086:8086"
- "8090:8090"
depends_on:
- zookeeper
- kafka1
environment: # parameter in variables.env
DOCKER_INFLUXDB_INIT_MODE: ${DOCKER_INFLUXDB_INIT_MODE}
DOCKER_INFLUXDB_INIT_USERNAME: ${DOCKER_INFLUXDB_INIT_USERNAME}
DOCKER_INFLUXDB_INIT_PASSWORD: ${DOCKER_INFLUXDB_INIT_PASSWORD}
DOCKER_INFLUXDB_INIT_ORG: ${DOCKER_INFLUXDB_INIT_ORG}
DOCKER_INFLUXDB_INIT_BUCKET: ${DOCKER_INFLUXDB_INIT_BUCKET}
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: ${influxdb_token}
networks:
- db
volumes:
- ./data/influxdb/:/var/lib/influxdb
telegraf:
image: telegraf:latest
container_name: telegraf
environment: # configuration in /telegraf/telegraf.conf
influxdb_token: ${influxdb_token} # parameter in variables.env
depends_on:
- zookeeper
- influxdb
- kafka1
restart: unless-stopped
networks:
- kafka
- db
volumes:
- ./conf/telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
grafana:
image: grafana/grafana
container_name: grafana
environment:
- GF_SECURITY_ADMIN_USERNAME=admin
- GF_SECURITY_ADMIN_PASSWORD=1234
ports:
- "3000:3000"
# volumes:
# - ./grafana/data:/var/lib/grafana
networks:
- grafana
- db
- line
depends_on:
- telegraf
# temperature-producer:
# build:
# context: ./DataSender
# dockerfile: dockerfile
# container_name: temperature-producer
# depends_on:
# - kafka1
# networks:
# - kafka
### NETWORKS
networks:
kafka: # connection to kafka
name: kafka
driver: bridge
line: # connection to linesender
name: line
driver: bridge
db: # connections to the database
name: db
driver: bridge
grafana:
name: grafana
driver: bridge
环境变量:
DOCKER_INFLUXDB_INIT_MODE=setup DOCKER_INFLUXDB_INIT_USERNAME=influx-admin DOCKER_INFLUXDB_INIT_PASSWORD=ThisIsNotThePasswordYouAreLookingFor DOCKER_INFLUXDB_INIT_ORG=ORG DOCKER_INFLUXDB_INIT_BUCKET=system_state influxdb_token=random_token`
我仍然不明白为什么数据没有显示在 influxDB 中(没有找到标签键)
我想查看 influxDB 中的数据。
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at
## most metric_batch_size metrics.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default, precision will be set to the same timestamp order as the
## collection interval, with the maximum being 1s.
## Precision will NOT be used for service inputs, such as logparser and statsd.
## Valid values are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Run telegraf in debug mode
debug = false
## Run telegraf in quiet mode
quiet = false
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
# Configuration for influxdb server to send metrics to
[[outputs.influxdb_v2]]
## The full HTTP or UDP endpoint URL for your InfluxDB instance.
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
# urls = ["udp://localhost:8089"] # UDP endpoint example
urls = ["http://influxdb:8086"] ## Docker-Compose internal address
token = "random_token" ## token name, setting from config not working
organization = "ORG" ## orga name, setting from config not working
bucket = "system_state" ## bucket name / db name, setting from config not working
#database = "system_state"
## Write timeout (for the InfluxDB client), formatted as a string.
## If not provided, will default to 5s. 0s means no timeout (not recommended).
timeout = "10s"
###############################################################################
# SERVICE INPUT PLUGINS #
###############################################################################
[[inputs.kafka_consumer]]
## Kafka brokers.
brokers = ["kafka1:9092", "localhost:9093"] ## docker-compose internal address of kakfa
## Topics to consume.
topics = [ "states"] ## topic to subscribe to
## When set this tag will be added to all metrics with the topic as the value.
#topic_tag = "kafka"
## Optional Client id
client_id = "kti_state" ## "username" of telegraf for kafka
## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Must be 0.10.2.0 or greater.
## ex: version = "1.1.0"
# version = ""
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## SASL authentication credentials. These settings should typically be used
## with TLS encryption enabled using the "enable_tls" option.
# sasl_username = "kafka"
# sasl_password = "secret"
## SASL protocol version. When connecting to Azure EventHub set to 0.
# sasl_version = 1
## Name of the consumer group.
# consumer_group = "telegraf_metrics_consumers"
## Initial offset position; one of "oldest" or "newest".
# offset = "oldest"
## Consumer group partition assignment strategy; one of "range", "roundrobin" or "sticky".
# balance_strategy = "range"
## Maximum length of a message to consume, in bytes (default 0/unlimited);
## larger messages are dropped
#max_message_len = 1000000
## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
##
## For example, if each message from the queue contains 10 metrics and the
## output metric_batch_size is 1000, setting this to 100 will ensure that a
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"
## When strict is true and a JSON array is being parsed, all objects within the
## array must be valid
json_strict = true
## Query is a GJSON path that specifies a specific chunk of JSON to be
## parsed, if not specified the whole document will be parsed.
##
## GJSON query paths are described here:
## https://github.com/tidwall/gjson/tree/v1.3.0#path-syntax
json_query = ""
## Tag keys is an array of keys that should be added as tags. Matching keys
## are no longer saved as fields.
tag_keys = []
## Array of glob pattern strings keys that should be added as string fields.
json_string_fields = ["color"]
## Name key is the key to use as the measurement name.
json_name_key = ""
## Time key is the key containing the time that should be used to create the
## metric.
json_time_key = ""
## Time format is the time layout that should be used to interprete the json_time_key.
## The time must be `unix`, `unix_ms`, `unix_us`, `unix_ns`, or a time in the
## "reference time". To define a different format, arrange the values from
## the "reference time" in the example to match the format you will be
## using. For more information on the "reference time", visit
## https://golang.org/pkg/time/#Time.Format
## ex: json_time_format = "Mon Jan 2 15:04:05 -0700 MST 2006"
## json_time_format = "2006-01-02T15:04:05Z07:00"
## json_time_format = "01/02/2006 15:04:05"
## json_time_format = "unix"
## json_time_format = "unix_ms"
json_time_format = ""
## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "America/New_York" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
json_timezone = ""