我试图将收集到的日志文件插入到 Kafka 中,Kafka 会将日志存储在 Clickhouse 中,正在发送文件到 Kafka 的日志,但 Kafka 无法将日志发送/存储到 Clickhouse。检查容器时我没有看到任何奇怪的日志。请指导我我在这里做错了什么。
docker-compose.yml
version: "3.5"
services:
filebeat:
image: docker.elastic.co/beats/filebeat:8.0.1
volumes:
- "./logs:/var/log/"
- "./filebeat.yml:/usr/share/filebeat/filebeat.yml"
container_name: filebeat
networks:
- filebeat-net
depends_on:
- kafka
zookeeper:
image: docker.io/bitnami/zookeeper:3.7
ports:
- "2181:2181"
volumes:
- "zookeeper-data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- filebeat-net
container_name: zookeeper
kafka:
image: docker.io/bitnami/kafka:3
ports:
- "9092:9092"
- '29092:29092'
volumes:
- "kafka-data:/bitnami"
environment:
- "HOSTNAME_COMMAND=docker info | grep ^Name: | cut -d' ' -f 2"
- "KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT"
- "ALLOW_PLAINTEXT_LISTENER=yes"
- "KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181"
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
- "KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092"
- "KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092"
- "KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true"
depends_on:
- zookeeper
networks:
- filebeat-net
container_name: kafka
clickhouse-server:
image: clickhouse/clickhouse-server:22.5.1
container_name: clickhouse-server
hostname: clickhouse-server
ulimits:
nofile:
soft: 262144
hard: 262144
ports:
- "8123:8123"
- "9000:9000"
volumes:
- './init.sql:/docker-entrypoint-initdb.d/init-db.sql'
depends_on:
- kafka
networks:
- filebeat-net
networks:
filebeat-net:
name: filebeat-net
driver: bridge
volumes:
zookeeper-data:
name: zookeeper-data
driver: local
kafka-data:
name: kafka-data
driver: local
filebeat.yml
filebeat.inputs:
- input_type: log
paths:
- /var/log/*.log
output.kafka:
hosts: ["kafka:9092"]
topic: hello-messages-1
init.sql
CREATE TABLE IF NOT EXISTS mylogger (
id String,
event_time DateTime64(6),
details_json String)
ENGINE = MergeTree() PARTITION BY toYYYYMM(event_time) ORDER BY (id, event_time) SETTINGS index_granularity=8192;
CREATE TABLE IF NOT EXISTS
mylogger_kafka ( payload String ) ENGINE = Kafka('kafka:9092', 'hello-messages', 'KAFKA2CH_click', 'JSONAsString');
CREATE MATERIALIZED VIEW IF NOT EXISTS mylogger_kafka_consumer TO mylogger AS
SELECT JSONExtractString(payload, 'payload', 'after', 'id') as id,
toDateTime64(JSONExtractString(payload, 'payload', 'after', 'event_time'), 3, 'Asia/Jerusalem') as event_time,
JSONExtractString(payload, 'payload', 'after', 'details_json') as details_json
FROM mylogger_kafka;