扭曲集成阻塞 confluence-kafka pythong 库问题

问题描述 投票:0回答:1

让我们考虑一下这段代码

from twisted.web import server, resource
from twisted.internet.task import LoopingCall
from confluent_kafka import Consumer, KafkaException
import json


# Function to handle Kafka consumer
def kafka_consumer():
    def fetch_data():
        def poll_kafka():
            msg = consumer.poll(0.1)  
            if msg is None:
                return
            if msg.error():
                if msg.error().code() == KafkaException._PARTITION_EOF:
                    return
                else:
                    return
            else:
                print("message", msg, msg.value())
                consumer.commit()  # Manually commit the offset

        # Execute Kafka polling in a separate thread
        d1 = threads.deferToThread(poll_kafka)

    def start_loop():
        lc = LoopingCall(fetch_data)
        lc.start(0.5) 

    conf = {
        'bootstrap.servers': 'kafka_internal-1:29093',
        'group.id': 'your_consumer_group-2',  
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False  # Disable autocommit
    }

    consumer = Consumer(conf)
    consumer.subscribe(['jithin_test'])  # <-- is it a blocking call??

    start_loop()

# Web service handler
class WebService(resource.Resource):
    isLeaf = True

    def render_GET(self, request):
        # You can customize the response according to your needs
        response = {
            'message': 'Welcome to the Kafka consumer service!'
        }
        return json.dumps(response).encode('utf-8')


if __name__ == '__main__':
    reactor.callWhenRunning(kafka_consumer)

    # Run the Twisted web service
    root = WebService()
    site = server.Site(root)
    reactor.listenTCP(8180, site)
    reactor.run()

这里我在反应器线程中实例化来自

Consumer
confluent-kafka
对象, 然后将后面的
poll()
留给
deferToThread()
, 我对此没什么疑问,

  1. Consumer.subscribe()
    是不是阻塞调用,我调用的时候应该是
    deferTothread
    这个方法吗?
  2. 如果我使用
    poll_kafka
    再次调用
    deferToThread
    ,它是否会破坏消费者重新平衡的情况(根据我的理解,每次我们使用
    deferToThread
    运行的线程都将来自线程池并且不能保证我们将使用相同的线程)?
  3. 如果是这样,有办法管理吗?也许在一个单独的Python线程中运行整个东西并将消耗的值传递回扭曲的应用程序?
  4. 或者有没有一种方法可以在不破坏消费者的情况下重新使用消费者对象?

Nb:代码是用

python2
编写的,它是一些旧系统的集成,不可能在 atm 上移植整个内容,并且大多数其他库仅支持 python 3+。

python apache-kafka twisted confluent-kafka-python
1个回答
0
投票

如果您想将事情推迟到不是反应器线程的线程,我建议使用 https://docs.twistedmatrix.com/en/stable/api/twisted.internet.threads.html#deferToThreadPool 和自定义 https://docs.twistedmatrix.com/en/stable/api/twisted.python.threadpool.ThreadPool.html 您自己管理,使用

minthreads=1, maxthreads=1

您确实必须对此线程池进行一些烦人的生命周期管理,但在这个简单的示例中,这只是

.start()
中的
kafka_consumer
和添加到
.stop()
的内容中的
reactor.addSystemEventTrigger("before", "shutdown", ...)

© www.soinside.com 2019 - 2024. All rights reserved.