如何在Python中处理特定数量的消息后优雅地停止Kafka消费者?

问题描述 投票:0回答:1

我有一个带有 BashOperator 的 Airflow DAG,它运行 Kafka 生产者,生成随机数量的消息。这些消息由 Kafka 消费者消费,并将它们写入 JSON 文件。但是,我希望消费者在处理特定数量的消息后优雅地停止,而不会导致 Airflow DAG 中出现错误。

我考虑过使用超时,但我更喜欢更干净的解决方案。有推荐的方法来实现这一目标吗?我希望 DAG 在消费者处理完初始消息集后继续执行下一步而不会出现错误。

任何有关此场景最佳方法的帮助或指导将不胜感激!

我的生产者代码是:

import time 
import json 
import random 
from datetime import datetime
from kafka import KafkaProducer


def generate_data() -> dict: 
    
    id_val = random.randint(1,1000000) 
    cur_timestamp = time.strftime("%Y-%m-%d %H:%M:%S") 
    platform_type = random.choice(['ios','android','web','mobile-web']) 
    messages = {
        'id': id_val,
        'cur_timestamp': cur_timestamp,
        'type': platform_type
    }
    return messages


def serializer(messages):
    return json.dumps(messages).encode('utf-8') 


topic = "new_topic" 
producer = KafkaProducer( 
    bootstrap_servers=['localhost:9092'],
        value_serializer=serializer,
    api_version=(0,11,5)
) 

record_cnt = random.randint(10,100)
def produce_msg():
    for _ in range(record_cnt):
        send_msg = generate_data()
        producer.send(topic, send_msg)
        print(f'Producing message {str(send_msg)}')
    producer.flush()    
    producer.close()

                
produce_msg() 

print('produce finished')

我的消费者代码是:

import json 
from kafka import KafkaConsumer

topic = "new_topic" 
json_path = '/home/airflow/clickstream.json'
consumer = KafkaConsumer(topic, bootstrap_servers="localhost:9092",
enable_auto_commit=True,auto_offset_reset='earliest')

def consumer_to_json():
    with open(json_path, 'w') as json_file:
        for send_msg in consumer:
            message_value = send_msg.value.decode("utf-8")
     
            json_data = json.loads(message_value)
            print(json_data)
            json.dump(json_data, json_file)
            json_file.write('\n')
        consumer.close()
        print ('finish')  


if __name__ == "__main__":
    consumer_to_json()
python apache-kafka kafka-consumer-api kafka-producer-api kafka-python
1个回答
0
投票

你可以数一下吗?

num_messages = 0
for send_msg in consumer:
    message_value = send_msg.value.decode("utf-8")
    json_data = json.loads(message_value)
    print(json_data)
    json.dump(json_data, json_file)
    json_file.write('\n')
    num_messages += 1
    if num_messages >= MESSAGES_LIMIT:  # TODO: define       
        break
consumer.close()
© www.soinside.com 2019 - 2024. All rights reserved.