让我们考虑一下这段代码
from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json
# Function to handle Kafka consumer
def kafka_consumer():
def fetch_data():
def poll_kafka():
msg = consumer.poll(0.1)
if msg is None:
return
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
return
else:
return
else:
print("message", msg, msg.value())
consumer.commit() # Manually commit the offset
# Execute Kafka polling in a separate thread
d1 = threads.deferToThread(poll_kafka)
def start_loop():
lc = LoopingCall(fetch_data)
lc.start(0.5)
conf = {
'bootstrap.servers': 'kafka_internal-1:29093',
'group.id': 'your_consumer_group-2',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Disable autocommit
}
consumer = Consumer(conf)
consumer.subscribe(['jithin_test']) # <-- is it a blocking call??
start_loop()
# Web service handler
class WebService(resource.Resource):
isLeaf = True
def render_GET(self, request):
# You can customize the response according to your needs
response = {
'message': 'Welcome to the Kafka consumer service!'
}
return json.dumps(response).encode('utf-8')
if __name__ == '__main__':
reactor.callWhenRunning(kafka_consumer)
# Run the Twisted web service
root = WebService()
site = server.Site(root)
reactor.listenTCP(8180, site)
reactor.run()
这里我在反应器线程中实例化来自
Consumer
的 confluent-kafka
对象,
然后将后面的poll()
留给deferToThread()
,
我对此没什么疑问,
Consumer.subscribe()
是不是阻塞调用,我调用的时候应该是deferTothread
这个方法吗?poll_kafka
再次调用 deferToThread
,它是否会破坏消费者重新平衡的情况(根据我的理解,每次我们使用 deferToThread
运行的线程都将来自线程池并且不能保证我们将使用相同的线程)?Nb:代码是用
python2
编写的,它是一些旧系统的集成,不可能在 atm 上移植整个内容,并且大多数其他库仅支持 python 3+。
如果您想将事情推迟到不是反应器线程的线程,我建议使用 https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool 和自定义 https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.html 您自己管理,使用
minthreads=1, maxthreads=1
。
您确实必须对此线程池进行一些烦人的生命周期管理,但在这个简单的示例中,这只是
.start()
中的 kafka_consumer
和添加到 .stop()
的内容中的 reactor.addSystemEventTrigger("before", "shutdown", ...)
。