我有一个带有 BashOperator 的 Airflow DAG,它运行 Kafka 生产者,生成随机数量的消息。这些消息由 Kafka 消费者消费,并将它们写入 JSON 文件。但是,我希望消费者在处理特定数量的消息后优雅地停止,而不会导致 Airflow DAG 中出现错误。
我考虑过使用超时,但我更喜欢更干净的解决方案。有推荐的方法来实现这一目标吗?我希望 DAG 在消费者处理完初始消息集后继续执行下一步而不会出现错误。
任何有关此场景最佳方法的帮助或指导将不胜感激!
import time
import json
import random
from datetime import datetime
from kafka import KafkaProducer
def generate_data() -> dict:
id_val = random.randint(1,1000000)
cur_timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
platform_type = random.choice(['ios','android','web','mobile-web'])
messages = {
'id': id_val,
'cur_timestamp': cur_timestamp,
'type': platform_type
}
return messages
def serializer(messages):
return json.dumps(messages).encode('utf-8')
topic = "new_topic"
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=serializer,
api_version=(0,11,5)
)
record_cnt = random.randint(10,100)
def produce_msg():
for _ in range(record_cnt):
send_msg = generate_data()
producer.send(topic, send_msg)
print(f'Producing message {str(send_msg)}')
producer.flush()
producer.close()
produce_msg()
print('produce finished')
import json
from kafka import KafkaConsumer
topic = "new_topic"
json_path = '/home/airflow/clickstream.json'
consumer = KafkaConsumer(topic, bootstrap_servers="localhost:9092",
enable_auto_commit=True,auto_offset_reset='earliest')
def consumer_to_json():
with open(json_path, 'w') as json_file:
for send_msg in consumer:
message_value = send_msg.value.decode("utf-8")
json_data = json.loads(message_value)
print(json_data)
json.dump(json_data, json_file)
json_file.write('\n')
consumer.close()
print ('finish')
if __name__ == "__main__":
consumer_to_json()
你可以数一下吗?
num_messages = 0
for send_msg in consumer:
message_value = send_msg.value.decode("utf-8")
json_data = json.loads(message_value)
print(json_data)
json.dump(json_data, json_file)
json_file.write('\n')
num_messages += 1
if num_messages >= MESSAGES_LIMIT: # TODO: define
break
consumer.close()