我一直在尝试将数据从Kafka传递到Click House。
所以目前,我已经创建了一个运行kafka和clickhouse的docker容器。通过创建通过kafka生产者刷新的数据,我创建了一些随机数据集发送给kafka。我已经按照kafka-clickhouse文档中的建议创建了一个带有实例化视图的sql模式。
我被困在两点上,当我运行生成随机数据的python文件时,我可以看到发送的消息没有问题。但是我不确定如何在Clickhouse中查看它。
作为参考,我添加了我编写的SQL模式和kafka生产者部分:
CREATE TABLE IF NOT EXISTS filters (
userId Nullable(UInt32),
name String,
value String
) ENGINE = Kafka SETTINGS
kafka_broker_list = '192.168.0.43:9092',
kafka_topic_list = 'my-units',
kafka_group_name = 'statistics',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 2
CREATE TABLE IF NOT EXISTS filters_stats (
userId Nullable(UInt32),
name String,
value String
) ENGINE = MergeTree()
ORDER BY timestamp
CREATE MATERIALIZED VIEW IF NOT EXISTS filters_consumer TO filters_stats
AS SELECT * FROM filters;
数据创建和发送:
def generate_data():
message = {
'uesrId': random.randint(0, 200),
'name': "pickachu",
'value': random.randint(0, 10)
}
print("TYPE OF MESSAGE", type(message))
return message
def main():
for _ in range(100):
data = generate_data()
print("DATA", data)
if data:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
for key, value in data.items():
producer.send('json-topic', {key: value})
producer.flush()
if producer is not None:
producer.close()
else:
print("[NO KAFKA PRODUCER]")
time.sleep(5)
[编辑]:我想更清楚地了解如何运行这些sql命令,到目前为止,我所读到的这些命令都是在从kafka接收消息后自动运行的。但是我看不到任何日志或任何东西。
PS:我正在容器中运行,因此我的kafka和Clickhouse都运行良好,但是我无法看到任何东西。我在这里添加了我的github链接以供参考[here]。
下面,我添加了我的docker compose文件以供参考
version: "3.6"
services:
clickhouse-server:
image: yandex/clickhouse-server
volumes:
- ./clickhouse/config.xml:/etc/clickhouse-server/config.xml
- ./clickhouse/zookeeper-servers.xml:/etc/clickhouse-server/conf.d/zookeeper-servers.xml
- ./shared/ch-data/clickhouse:/var/lib/clickhouse
depends_on:
- kafka
links:
- kafka
clickhouse-client:
image: yandex/clickhouse-client
entrypoint:
- /bin/sleep
command:
- infinity
kafka:
image: wurstmeister/kafka:2.11-1.0.2
volumes:
- ./shared/ch-data/kafka:/data
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: localhost
links:
- zookeeper
ports:
- 9092:9092
- 9094:9094
zookeeper:
image: zookeeper
volumes:
- ./shared/ch-data/zookeeper:/data
ports:
- 2181:2181
您正在发送给本主题
producer.send('json-topic'
在此地址bootstrap_servers=['localhost:9092']
您的表未使用这些设置。此外,userId拼写错误,并且值是整数,而不是字符串。
总体上,请尝试查看数据库进程的日志以查看是否出现任何错误