我无法从 Python 客户端向 Kafka 发送或接收消息

问题描述 投票:0回答:1

我正在尝试创建一个运行 Kafka 和 ZooKeeper 的 pod,并通过 Python 连接到它。但是,根据我当前的配置,尽管我可以成功连接到 Kafka 公开的端口,但我无法从 Python 客户端发送或接收消息。 我目前在 Pod 配置方面遇到问题。我已经定义了主题并签入了暴露 Kafka 服务端口的 Lens。我可以确认它正在连接到 Kafka,但是当我从生产者发送消息并访问容器时,它们都没有到达。同样,也没有从消费者那里收到任何信息。我已经排除了与端口或主题相关的错误,因为我已经彻底检查了它们。

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-zookeeper
spec:
  serviceName: kafka-zookeeper-service
  replicas: 1
  selector:
    matchLabels:
      app: kafka-zookeeper
  template:
    metadata:
      labels:
        app: kafka-zookeeper
    spec:
      containers:
      - name: zookeeper
        image: docker.io/bitnami/zookeeper:3.9
        env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        - name: ZOO_VOLUME_DIR
          value: /bitnami/zookeeper
        ports:
        - containerPort: 2181
          name: zookeeper
        volumeMounts:
        - name: zookeeper-data
          mountPath: /bitnami/zookeeper
      - name: kafka
        image: docker.io/bitnami/kafka:3.4
        env:
        - name: KAFKA_CFG_ZOOKEEPER_CONNECT
          value: kafka-zookeeper-service:2181

        - name: KAFKA_VOLUME_DIR
          value: /bitnami/kafka

        - name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
          value: "true"

        - name: KAFKA_CFG_PROCESS_ROLES
          value: controller,broker

        - name: KAFKA_CFG_LISTENERS
          value: "PLAINTEXT://:9092,CONTROLLER://:9093"

        - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
          value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"

        - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
          value: "1001@localhost:9093"

        - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
          value: "CONTROLLER"

        - name: KAFKA_CFG_NODE_ID
          value: "1001"

        - name: KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE
          value: "true"
        ports:
        - containerPort: 9092
          name: kafka
        - containerPort: 9093
          name: kafka-plaintext
        - containerPort: 9094
          name: kafka-tls
        volumeMounts:
        - name: kafka-data
          mountPath: /bitnami/kafka
      volumes:
      - name: zookeeper-data
        persistentVolumeClaim:
          claimName: zookeeper-data-pvc
      - name: kafka-data
        persistentVolumeClaim:
          claimName: kafka-data-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-data-pvc
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 2Gi

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-data-pvc
spec:
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 2Gi
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-zookeeper-service
spec:
  selector:
    app: kafka-zookeeper
  ports:
  - protocol: TCP
    port: 2181
    targetPort: 2181
    name: zookeeper
  - protocol: TCP
    port: 9092
    targetPort: 9092
    name: kafka
  - protocol: TCP
    port: 9093
    targetPort: 9093
    name: kafka-plaintext
  - protocol: TCP
    port: 9094
    targetPort: 9094
    name: kafka-tls

制作人

从kafka导入KafkaProducer 导入 json

def main():
    bootstrap_servers = 'localhost:34715'
    topic = 'dev'
    message = {'message': 'Hello from the producer'}

    try:
        producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        print("Successful connection to Kafka server at:", bootstrap_servers)
        producer.send(topic, value=message)
        print("Message sent successfully to topic:", topic)
    except Exception as e:
        print("Error sending message:", e)

if __name__ == '__main__':
    main()

消费者

from kafka import KafkaConsumer

def main():
    consumer = KafkaConsumer('dev', bootstrap_servers='localhost:34715')

    try:
        for message in consumer:
            print(f'Message received: {message.value.decode("utf-8")}')
    finally:
        consumer.close()

if __name__ == '__main__':
    main()
python docker kubernetes apache-kafka
1个回答
0
投票

您可以尝试

flush
确保消息立即发送并
close
功能吗?

import json
from kafka import KafkaProducer

def main():
    bootstrap_servers = 'localhost:34715'
    topic = 'dev'
    message = {'message': 'Hello from the producer'}

    try:
        producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        print("Successful connection to Kafka server at:", bootstrap_servers)
        producer.send(topic, value=message)
        print("Message sent successfully to topic:", topic)
        # Flush the producer to ensure the message is sent
        producer.flush()
    except Exception as e:
        print("Error sending message:", e)
    finally:
        # Close the producer
        producer.close()

if __name__ == '__main__':
    main()
© www.soinside.com 2019 - 2024. All rights reserved.