Kafka 消费者记录 - 处理

问题描述 投票:0回答:0

对于 kafka,我使用 getmany 来读取消费者消息。在总共 650 条消息(大约需要 3 天的时间来处理)中,处理大约 100-150 条记录(有时 12 小时或有时 24 小时),然后没有进一步的处理发生。但是消费者流并没有关闭,当我放入一条新消息时它正在处理,但无法弄清楚为什么会发生。

我认为可能会跳过某些消息,因此要检查我是否尝试仅生成 25 条消息 - 所有已处理(3 小时)然后生成 100 条消息 - 所有已处理(12 小时),在这些情况下消息不会被跳过。只有当很多消息(650)没有被放置时才会发生这个问题。

代码片段:

async def consume(loop,lock):
    logger.info('Inside Consume')

    consumer = AIOKafkaConsumer(KAFKA_TOPIC,
                        loop=loop,
                        bootstrap_servers=bootstrap_servers,
                        group_id=group_id,           
                        enable_auto_commit=enable_auto_commit,       
                        auto_commit_interval_ms=auto_commit_interval_ms,  
                        auto_offset_reset=auto_offset_reset,  
                        max_poll_records= 1,
                        max_poll_interval_ms=1500000,
                        rebalance_timeout_ms=1500000)
              
    await consumer.start()
    while True:
        result = await consumer.getmany(timeout_ms=1500000, max_records=1)
        for tp, msg in result.items(): 
            if msg:
                message = msg[0].value.decode()
                message_json = json.loads(message)
                data_encoded = message_json['payload']

                if data_encoded.get('content') is not None:
                    msg = data_encoded['content']
                    message_dict_dum = base64.b64decode(msg)
                    message_dict = json.loads(message_dict_dum)
                    
                    if message_dict.get("key1") is not None:
                        if message_dict["key1"] == "something":
                            logger.info('Hurray new message to process')
                            logger.info(message_dict["id"])
                            await process(data_encoded['content'],lock)
                            

async def process(msg, lock):
    async with lock:
        logger.info('processing... ->{}'.format(msg))
        await processmessage(msg)
        logger.info('done processing')        


if __name__ == "__main__":
    logger.info('Inside main')

    loop = asyncio.new_event_loop()
    lock = asyncio.Lock(loop=loop)

    loop.create_task(consume(loop, lock))
    loop.run_forever()
apache-kafka kafka-consumer-api aiokafka
© www.soinside.com 2019 - 2024. All rights reserved.