如何向同一网络中的远程Kafka服务器/集群发送消息?

问题描述 投票:0回答:1

如何从本地机器向远程kafka生产者发送消息? 我不太确定这个问题是否以正确的方式提出。以下是场景。

我有两台机器都在网络 192.168.1.x 中。

IP 192.168.1.123的Server_Ku是Ubuntu 22.04,安装了Kafka v3.6.1,使用Kraft作为控制器。

IP 192.168.1.234 的 Server_Lw 是用于开发的 Windows 10 本地计算机。它安装了 python 和 kafka-python 包,但没有安装 Kafka。 相对于Server_Lw,Server_Ku是远程服务器。

现在两台机器都禁用了ipv6,可以互相ping通了。

在Server_Ku上,本地生产者和消费者都可以很好地使用bootstrap_servers=localhost:9092,没有问题。 下面显示了它的kraft/server.properties: $ sudo nano /opt/kafka/kafka_2.12-3.6.1/config/kraft/server.properties

######## Socket Server Settings ######
# IGNORE THE ABOVE PART OF THE FILE. I did not touch the following original settings.


listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

在Server_Lw上,我可以运行以下生产者代码,它不断发送消息,没有任何错误。如您所见,bootstrap_servers 设置为 Server_Ku (192.168.1.123)。

import pandas as pd
from kafka import KafkaProducer
from json import dumps
import time
KAFKA_TOPIC_NAME_CONS = "orderstopicdemo"
KAFKA_BOOTSTRAP_SERVERS_CONS = "192.168.1.123:9092"
# print("kafka started")
kafka_producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
                               value_serializer=lambda x: dumps(x).encode('utf-8'))
###
# # Get message from a file.
###
file_path = r"D:\Projects\GitHub\KafkaSampleProject\data\orders.csv"
orders_pd_df = pd.read_csv(file_path)
# orders_pd_df['date_time'] = pd.str(Timestamp("now"))
orders_pd_df['date_time'] = str(pd.Timestamp("now"))
# print(orders_pd_df)
# print(orders_pd_df.head(1))
orders_list = orders_pd_df.to_dict(orient='records')
# print(orders_list[0])
for order in orders_list:
    message = order
    print("Message to be sent:", message)
    kafka_producer.send(KAFKA_TOPIC_NAME_CONS, message)
    time.sleep(2)

在 Server_Lw 上启动上述生产者 python 代码后,我也在 Server_Lw 上启动了消费者 python 代码。然而,它只是挂在那里,继续运行,没有任何返回或失败。

from kafka import KafkaConsumer
KAFKA_CONSUMER_GROUP_NAME_CONS = "test-consumer-group"        
KAFKA_TOPIC_NAME_CONS = "orderstopicdemo"
KAFKA_BOOTSTRAP_SERVERS_CONS = "192.168.1.123:9092"
    
if __name__ == "__main__":
    print("Kafka Consumer Application Started ... ")
    try:
        consumer = KafkaConsumer(
            KAFKA_TOPIC_NAME_CONS,
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
            auto_offset_reset='latest',
            enable_auto_commit=True,
            group_id=KAFKA_CONSUMER_GROUP_NAME_CONS,
            # value_deserializer=lambda x: loads(x.decode('utf-8')))
            value_deserializer=lambda x: x.decode('utf-8'))
        for message in consumer:
            # print(dir(message))
            print(type(message))
            print("Key: ", message.key)
            message = message.value
            print("Message received: ", message)
    except Exception as ex:
        print("Failed to read kafka message.")
        print(ex)

好的,接下来我做的是让生产者 python 脚本在 Server_Lw 上运行,但在 Server_Lw 上停止上述消费者 python 脚本。然后我转到 Server_Ku,启动相同的消费者 python 脚本(唯一的区别是在 Server_Ku 上,bootstrap_servers = localhost:9092)。与 Server_Lw 上的消费者脚本相同,它也只是挂在那里,没有任何返回或失败。

生产者和消费者的相同代码,将 bootstrap_servers 设置为 localhost:9092 可以在 Server_Ku 上顺利运行,并在屏幕上打印所有生产和消费的消息。

但是,我想要的是在本地 Server_Lw 上运行 python,并让它与 Server_Ku 上安装的 Kafka 通信。我怎样才能实现它?如何在本地计算机上与远程 Kafka 服务器/集群发送/接收消息?

非常感谢!

python apache-kafka remote-server
1个回答
0
投票

解决了! 以下是我所做的。

  1. 请勿在 Ubuntu 上禁用
    IPv6
    IPv6
    禁用后会出现随机问题。
  2. advertised.listeners=PLAINTEXT://localhost:9092
    更新为
    advertised.listeners=PLAINTEXT://192.168.1.123:9092
  3. 我发现我的
    Server_Ku
    只有几兆可用空间,我扩展了磁盘。
© www.soinside.com 2019 - 2024. All rights reserved.