Producer.py
from binance.websocket.spot.websocket_client import SpotWebsocketClient
from binance.spot import Spot as Client
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer
import multiprocessing
import logging
import json
import time
import os
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=json_serializer)
base_url = 'https://api.binance.com'
stream_url = 'wss://stream.binance.com:9443/ws'
client = Client(base_url=base_url)
ws_client = SpotWebsocketClient(stream_url=stream_url)
order_book = {
"lastUpdateId": 0,
"bids": [],
"asks": []
}
def orderbook_message_handler(symbol,message):
# print(symbol,symbols_dict[symbol])
result = producer.send("binance-orderbook",message,partition=symbols_dict[symbol])
print("result: ",result)
def listen_binance_orderbook(symbol):
ws_client.start()
symbol = symbol.lower()
ws_client.diff_book_depth(
symbol=symbol.lower(),
id=1,
speed=1000,
callback=lambda message: orderbook_message_handler(symbol, message)
)
def load_symbols():
with open('binance_config.json') as f:
data = json.load(f)
symbol_dict = {symbol: number for symbol, number in data.items()}
return symbol_dict
symbols_dict = load_symbols()
def main():
global symbols_dict
try:
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
topic = NewTopic(name='binance-orderbook',
num_partitions=len(symbols_dict),
replication_factor=1)
admin.create_topics([topic])
except Exception:
pass
processes = []
for key, value in symbols_dict.items():
# multiprocessing helps to bypass GIL
p = multiprocessing.Process(target=listen_binance_orderbook, args=(key,))
processes.append(p)
# start all processes
for p in processes:
p.start()
# wait for all processes to complete
for p in processes:
p.join()
main()
Consumer.py
from kafka import KafkaAdminClient, KafkaConsumer
from kafka import TopicPartition
import json
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
group_id="strategy-two",
auto_offset_reset='earliest',
)
# assign to a specific partition of the topic
partition = TopicPartition('binance-orderbook', 0)
consumer.assign([partition])
# seek to the beginning of the partition
consumer.seek_to_beginning(partition)
for msg in consumer:
print("inside")
print("binance orderbook = {}".format(json.loads(msg.value)))
我正在尝试使用 Python 生产者向 Kafka 主题发送消息,但似乎没有发送消息。我在本地主机上有一个正在运行的 Kafka 代理(localhost:9092)和 zookeeper,我正在使用 kafka-python 库发送消息。
我已经检查我的生产者代码运行没有任何错误,当我调用 producer.send() 时,我可以看到输出“result:
我已经检查过我的消费者代码没有任何错误地运行,并且我已经将它分配给了正确的主题和分区。我还将 auto_offset_reset 设置为“最早”以确保它从主题的开头开始。
我该如何调试这个问题?我可以采取哪些步骤来检查消息是否真的被发送到 Kafka,以及我的消费者代码是否从正确的主题和分区读取?
消费者似乎闲着,没有收到任何消息。 我先运行生产者,然后运行消费者
任何帮助将不胜感激。谢谢!