Kafka Consumer 没有收到来自主题的消息——如何调试?

问题描述 投票:0回答:0

Producer.py

from binance.websocket.spot.websocket_client import SpotWebsocketClient
from binance.spot import Spot as Client
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
import multiprocessing
import logging
import json
import time
import os


def json_serializer(data):
    return json.dumps(data).encode("utf-8")

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=json_serializer)

base_url = 'https://api.binance.com'
stream_url = 'wss://stream.binance.com:9443/ws'
client = Client(base_url=base_url)
ws_client = SpotWebsocketClient(stream_url=stream_url)
order_book = {
    "lastUpdateId": 0,
    "bids": [],
    "asks": []
}


def orderbook_message_handler(symbol,message): 
    
    # print(symbol,symbols_dict[symbol])
    result = producer.send("binance-orderbook",message,partition=symbols_dict[symbol])
    print("result: ",result)

def listen_binance_orderbook(symbol):
    ws_client.start()
    symbol = symbol.lower()
    ws_client.diff_book_depth(
        symbol=symbol.lower(),
        id=1,
        speed=1000,
        callback=lambda message: orderbook_message_handler(symbol, message)
    )

def load_symbols():
    with open('binance_config.json') as f:
        data = json.load(f)
    symbol_dict = {symbol: number for symbol, number in data.items()}
    return symbol_dict

symbols_dict = load_symbols()

def main():

    global symbols_dict
    try:
        admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

        topic = NewTopic(name='binance-orderbook',
                            num_partitions=len(symbols_dict),
                            replication_factor=1)
        admin.create_topics([topic])
    except Exception:
        pass

    processes = []
    for key, value in symbols_dict.items():
        # multiprocessing helps to bypass GIL 
        p = multiprocessing.Process(target=listen_binance_orderbook, args=(key,))
        processes.append(p)

    # start all processes
    for p in processes:
        p.start()

    # wait for all processes to complete
    for p in processes:
        p.join()


main()

Consumer.py

from kafka import KafkaAdminClient, KafkaConsumer
from kafka import TopicPartition
import json

consumer = KafkaConsumer(              
    bootstrap_servers=['localhost:9092'],
    group_id="strategy-two",           
    auto_offset_reset='earliest',  

)

# assign to a specific partition of the topic
partition = TopicPartition('binance-orderbook', 0)
consumer.assign([partition])

# seek to the beginning of the partition
consumer.seek_to_beginning(partition)

for msg in consumer:
    print("inside")
    print("binance orderbook = {}".format(json.loads(msg.value)))


我正在尝试使用 Python 生产者向 Kafka 主题发送消息,但似乎没有发送消息。我在本地主机上有一个正在运行的 Kafka 代理(localhost:9092)和 zookeeper,我正在使用 kafka-python 库发送消息。

我已经检查我的生产者代码运行没有任何错误,当我调用 producer.send() 时,我可以看到输出“result: ”。但是,我的消费者代码似乎没有收到来自 Kafka 主题的任何消息。

我已经检查过我的消费者代码没有任何错误地运行,并且我已经将它分配给了正确的主题和分区。我还将 auto_offset_reset 设置为“最早”以确保它从主题的开头开始。

我该如何调试这个问题?我可以采取哪些步骤来检查消息是否真的被发送到 Kafka,以及我的消费者代码是否从正确的主题和分区读取?

消费者似乎闲着,没有收到任何消息。 我先运行生产者,然后运行消费者

任何帮助将不胜感激。谢谢!

python events apache-kafka kafka-consumer-api kafka-producer-api
© www.soinside.com 2019 - 2024. All rights reserved.