将数据从Kafka发送到Clickhouse

问题描述 投票:0回答:1

我一直在尝试将数据从Kafka传递到Click House。

所以目前,我已经创建了一个运行kafka和clickhouse的docker容器。通过创建通过kafka生产者刷新的数据,我创建了一些随机数据集发送给kafka。我已经按照kafka-clickhouse文档中的建议创建了一个带有实例化视图的sql模式。

我被困在两点上,当我运行生成随机数据的python文件时,我可以看到发送的消息没有问题。但是我不确定如何在Clickhouse中查看它。

作为参考,我添加了我编写的SQL模式和kafka生产者部分:

CREATE TABLE IF NOT EXISTS filters (
  userId Nullable(UInt32),
  name String,
  value String
) ENGINE = Kafka SETTINGS
            kafka_broker_list = '192.168.0.43:9092',
            kafka_topic_list = 'my-units',
            kafka_group_name = 'statistics',
            kafka_format = 'JSONEachRow',
            kafka_num_consumers = 2


CREATE TABLE IF NOT EXISTS filters_stats (
  userId Nullable(UInt32),
  name String,
  value String
) ENGINE = MergeTree()
ORDER BY timestamp

CREATE MATERIALIZED VIEW IF NOT EXISTS filters_consumer TO filters_stats
  AS SELECT * FROM filters;

数据创建和发送:

def generate_data():

    message = {
        'uesrId': random.randint(0, 200),
        'name': "pickachu",
        'value': random.randint(0, 10)
   }

   print("TYPE OF MESSAGE", type(message))

   return message




def main():
    for _ in range(100):
        data = generate_data()

        print("DATA", data)

        if data:
            producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))
            for key, value in data.items():
                producer.send('json-topic', {key: value})
                producer.flush()
            if producer is not None:
                producer.close()
            else:
                print("[NO KAFKA PRODUCER]")

        time.sleep(5)

[编辑]:我想更清楚地了解如何运行这些sql命令,到目前为止,我所读到的这些命令都是在从kafka接收消息后自动运行的。但是我看不到任何日志或任何东西。

PS:我正在容器中运行,因此我的kafka和Clickhouse都运行良好,但是我无法看到任何东西。我在这里添加了我的github链接以供参考[here]

下面,我添加了我的docker compose文件以供参考

version: "3.6"

services:
  clickhouse-server:
    image: yandex/clickhouse-server
    volumes:
      - ./clickhouse/config.xml:/etc/clickhouse-server/config.xml
      - ./clickhouse/zookeeper-servers.xml:/etc/clickhouse-server/conf.d/zookeeper-servers.xml
      - ./shared/ch-data/clickhouse:/var/lib/clickhouse
    depends_on:
      - kafka
    links:
      - kafka

  clickhouse-client:
    image: yandex/clickhouse-client
    entrypoint:
      - /bin/sleep
    command:
      - infinity

  kafka:
    image: wurstmeister/kafka:2.11-1.0.2
    volumes:
      - ./shared/ch-data/kafka:/data
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: localhost
    links:
     - zookeeper
    ports:
     - 9092:9092
     - 9094:9094

  zookeeper:
    image: zookeeper
    volumes:
      - ./shared/ch-data/zookeeper:/data
    ports:
      - 2181:2181
apache-kafka kafka-producer-api clickhouse
1个回答
0
投票

您正在发送给本主题

producer.send('json-topic'

在此地址bootstrap_servers=['localhost:9092']

您的表未使用这些设置。此外,userId拼写错误,并且值是整数,而不是字符串。

总体上,请尝试查看数据库进程的日志以查看是否出现任何错误

© www.soinside.com 2019 - 2024. All rights reserved.