我在 Kafka 中有 3 个消费者和 1 个生产者。
当生产者发送所有消息时(我的简单代码中有100条消息),这些消息被分配给三个消费者,而我的主要问题就是这种消息的划分。
有时一条消息可能很长,这就是为什么一个消费者可能无法快速回复所有消息,而另一个快速回复所有消息的消费者却变得闲置无事可做。
如何将所有消息放入队列中,并且每当消费者完成工作时,然后从生产者那里接收下一条消息? (当然,我不知道消费者是否接收来自生产者或主题的消息,我是这个领域的初学者)
谢谢你对我的彻底指导。
我录制了一个关于工作过程的视频,请观看。视频显示,一名消费者已完成工作并处于闲置状态,但另外两名消费者正在运行。
电影链接。
我的代码:
主题:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --replication-factor 1
生产者.py:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(100):
data = {'number' : e}
producer.send('numbers', value=data)
print(f"Sending data : {data}")
consumer0.py:
import json
from kafka import KafkaConsumer
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(f"{message.value}")
consumer1.py:
import json
from kafka import KafkaConsumer
import time
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
time.sleep(1)
print(f"{message.value}")
consumer2.py:
import json
from kafka import KafkaConsumer
import time
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
time.sleep(2)
print(f"{message.value}")
这些消息被分配给三个消费者
这是无法保证的。 Kafka 对记录进行批处理,根据批次的大小,所有记录最终可能会出现在一个分区中。
您可以发送带有密钥的数据来强制执行不同的分区,假设每个密钥将根据分区数进行散列和取模为唯一值,但是生产者无法强制将消息广播给同一组的所有消费者将相同的事件复制到每个分区
很快变得无所事事,无所事事。
然后产生更多数据。如果您想批量发送 100 条记录,并且从不发送新事件,那么也许您并不真正需要流处理
每当消费者完成工作时,就会收到来自生产者的下一条消息
这正是你的 for 循环正在做的事情
不知道消费者是否收到来自生产者或主题的消息
您可以使用消费者滞后指标监控工具来检测这一点