producer = KafkaProducer(bootstrap_servers='kf-p1l-node3:9092,xxxxx,xxxxx',
value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8
consumer = KafkaConsumer( bootstrap_servers='rdwh-node1:49092,xxxxx,xxxxx',
# bootstrap_servers='kf-p1l-node3:9092,xxxxx,xxxxx',
auto_offset_reset=param["AUTO_OFFSET_RESET"],
consumer_timeout_ms=param["CONSUMER_TIMEOUT_MS"],
enable_auto_commit=False,
auto_commit_interval_ms=60000,
group_id=param["GROUP_ID"],
client_id=param["CLIENT_ID"]
)
consumer.subscribe([param["TOPIC_IN"]])
如果KafkaProducer和KafkaConsumer的bootstrap_server是相同的,这段代码就可以工作。但如果把KafkaConsumer换成其他服务器,就不能用了。
Bootstrap服务器必须包含所有用于建立Kafka集群初始连接的服务器。客户端将使用所有的服务器,而不管这里指定的是哪个服务器用于引导。你可以查看这里的文档。http:/kafka.apache.org090documentation.html。
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['node1:port1', 'node1:port2', 'node2:port3'])