如何从本地机器向远程kafka生产者发送消息? 我不太确定这个问题是否以正确的方式提出。以下是场景。
我有两台机器都在网络 192.168.1.x 中。
IP 192.168.1.123的Server_Ku是Ubuntu 22.04,安装了Kafka v3.6.1,使用Kraft作为控制器。
IP 192.168.1.234 的 Server_Lw 是用于开发的 Windows 10 本地计算机。它安装了 python 和 kafka-python 包,但没有安装 Kafka。 相对于Server_Lw,Server_Ku是远程服务器。
现在两台机器都禁用了ipv6,可以互相ping通了。
在Server_Ku上,本地生产者和消费者都可以很好地使用bootstrap_servers=localhost:9092,没有问题。 下面显示了它的kraft/server.properties: $ sudo nano /opt/kafka/kafka_2.12-3.6.1/config/kraft/server.properties
######## Socket Server Settings ######
# IGNORE THE ABOVE PART OF THE FILE. I did not touch the following original settings.
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
在Server_Lw上,我可以运行以下生产者代码,它不断发送消息,没有任何错误。如您所见,bootstrap_servers 设置为 Server_Ku (192.168.1.123)。
import pandas as pd
from kafka import KafkaProducer
from json import dumps
import time
KAFKA_TOPIC_NAME_CONS = "orderstopicdemo"
KAFKA_BOOTSTRAP_SERVERS_CONS = "192.168.1.123:9092"
# print("kafka started")
kafka_producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
value_serializer=lambda x: dumps(x).encode('utf-8'))
###
# # Get message from a file.
###
file_path = r"D:\Projects\GitHub\KafkaSampleProject\data\orders.csv"
orders_pd_df = pd.read_csv(file_path)
# orders_pd_df['date_time'] = pd.str(Timestamp("now"))
orders_pd_df['date_time'] = str(pd.Timestamp("now"))
# print(orders_pd_df)
# print(orders_pd_df.head(1))
orders_list = orders_pd_df.to_dict(orient='records')
# print(orders_list[0])
for order in orders_list:
message = order
print("Message to be sent:", message)
kafka_producer.send(KAFKA_TOPIC_NAME_CONS, message)
time.sleep(2)
在 Server_Lw 上启动上述生产者 python 代码后,我也在 Server_Lw 上启动了消费者 python 代码。然而,它只是挂在那里,继续运行,没有任何返回或失败。
from kafka import KafkaConsumer
KAFKA_CONSUMER_GROUP_NAME_CONS = "test-consumer-group"
KAFKA_TOPIC_NAME_CONS = "orderstopicdemo"
KAFKA_BOOTSTRAP_SERVERS_CONS = "192.168.1.123:9092"
if __name__ == "__main__":
print("Kafka Consumer Application Started ... ")
try:
consumer = KafkaConsumer(
KAFKA_TOPIC_NAME_CONS,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
auto_offset_reset='latest',
enable_auto_commit=True,
group_id=KAFKA_CONSUMER_GROUP_NAME_CONS,
# value_deserializer=lambda x: loads(x.decode('utf-8')))
value_deserializer=lambda x: x.decode('utf-8'))
for message in consumer:
# print(dir(message))
print(type(message))
print("Key: ", message.key)
message = message.value
print("Message received: ", message)
except Exception as ex:
print("Failed to read kafka message.")
print(ex)
好的,接下来我做的是让生产者 python 脚本在 Server_Lw 上运行,但在 Server_Lw 上停止上述消费者 python 脚本。然后我转到 Server_Ku,启动相同的消费者 python 脚本(唯一的区别是在 Server_Ku 上,bootstrap_servers = localhost:9092)。与 Server_Lw 上的消费者脚本相同,它也只是挂在那里,没有任何返回或失败。
生产者和消费者的相同代码,将 bootstrap_servers 设置为 localhost:9092 可以在 Server_Ku 上顺利运行,并在屏幕上打印所有生产和消费的消息。
但是,我想要的是在本地 Server_Lw 上运行 python,并让它与 Server_Ku 上安装的 Kafka 通信。我怎样才能实现它?如何在本地计算机上与远程 Kafka 服务器/集群发送/接收消息?
非常感谢!
解决了! 以下是我所做的。
IPv6
。 IPv6
禁用后会出现随机问题。advertised.listeners=PLAINTEXT://localhost:9092
更新为advertised.listeners=PLAINTEXT://192.168.1.123:9092
Server_Ku
只有几兆可用空间,我扩展了磁盘。