我对Python很新,只是开始使用Kafka,所以请原谅我的术语,如果我在某处错了。
所以我有一个基于Django的Web应用程序,我在同一个进程中通过Kafka Producer发送json消息。然而,在以实用方式创建主题时,我还在针对该特定主题的单独进程中开始(订阅)新的使用者。
#Consumer code snippet
if topic_name is not None :
#Create topic
create_kafka_topic_instance(topic_name)
#Initialize a consumer and subscribe to topic
Process(target=init_kafka_consumer_instance, args=(topic_name))
def forgiving_json_deserializer(v):
if v is None :
return
try:
return json.loads(v.decode('utf-8'))
except json.decoder.JSONDecodeError:
import traceback
print(traceback.format_exc())
return None
def init_kafka_consumer_instance(topic, group_id=None):
try:
if topic is None:
raise Exception("Invalid argument topic")
comsumer = None
comsumer = KafkaConsumer(topic, bootstrap_servers=[KAFKA_BROKER_URL], auto_offset_reset="earliest",
urn comsumer
except Exception as e:
import traceback
print(traceback.format_exc())
return Noneurn comsumer
except Exception as e:
import traceback
print(traceback.format_exc())
return None
制片人代码片段
# assuming obj is a model instance
serialized_obj = serializers.serialize('json', [ order, ])
#send_message(topic_name,order)
producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER_URL], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
x = producer.send("test", serialized_obj)
producer.flush()
现在我有一些查询,所以如果我的Django应用程序(服务器)以某种方式重新启动,我仍然会让消费者听取该主题。
我在消费者中也有一些打印语句,我无法在服务器控制台中看到。
但是在python shell中编写相同的代码片段(初始化一个使用者),我可以在那里看到print语句中的消息,这意味着我的Producer工作正常。
Kafka Server不依赖于您的Django应用程序(服务器)。但你的消费者是肯定的。
所以你的主题在Kafka服务器中仍然存在(如果kafka服务器死了,那就是另一个故事),但你的Consumer会重新启动你的应用程序。
因此,如果您希望您的消费者能够正常工作,请将其设置为与您的应用程序并行工作的Worker,并且在您的应用程序关闭时不会重新启动