我正在尝试创建一个运行 Kafka 和 ZooKeeper 的 pod,并通过 Python 连接到它。但是,根据我当前的配置,尽管我可以成功连接到 Kafka 公开的端口,但我无法从 Python 客户端发送或接收消息。 我目前在 Pod 配置方面遇到问题。我已经定义了主题并签入了暴露 Kafka 服务端口的 Lens。我可以确认它正在连接到 Kafka,但是当我从生产者发送消息并访问容器时,它们都没有到达。同样,也没有从消费者那里收到任何信息。我已经排除了与端口或主题相关的错误,因为我已经彻底检查了它们。
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka-zookeeper
spec:
serviceName: kafka-zookeeper-service
replicas: 1
selector:
matchLabels:
app: kafka-zookeeper
template:
metadata:
labels:
app: kafka-zookeeper
spec:
containers:
- name: zookeeper
image: docker.io/bitnami/zookeeper:3.9
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_VOLUME_DIR
value: /bitnami/zookeeper
ports:
- containerPort: 2181
name: zookeeper
volumeMounts:
- name: zookeeper-data
mountPath: /bitnami/zookeeper
- name: kafka
image: docker.io/bitnami/kafka:3.4
env:
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: kafka-zookeeper-service:2181
- name: KAFKA_VOLUME_DIR
value: /bitnami/kafka
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_CFG_PROCESS_ROLES
value: controller,broker
- name: KAFKA_CFG_LISTENERS
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: "1001@localhost:9093"
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_CFG_NODE_ID
value: "1001"
- name: KAFKA_CFG_ZOOKEEPER_METADATA_MIGRATION_ENABLE
value: "true"
ports:
- containerPort: 9092
name: kafka
- containerPort: 9093
name: kafka-plaintext
- containerPort: 9094
name: kafka-tls
volumeMounts:
- name: kafka-data
mountPath: /bitnami/kafka
volumes:
- name: zookeeper-data
persistentVolumeClaim:
claimName: zookeeper-data-pvc
- name: kafka-data
persistentVolumeClaim:
claimName: kafka-data-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: zookeeper-data-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-data-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-zookeeper-service
spec:
selector:
app: kafka-zookeeper
ports:
- protocol: TCP
port: 2181
targetPort: 2181
name: zookeeper
- protocol: TCP
port: 9092
targetPort: 9092
name: kafka
- protocol: TCP
port: 9093
targetPort: 9093
name: kafka-plaintext
- protocol: TCP
port: 9094
targetPort: 9094
name: kafka-tls
制作人
从kafka导入KafkaProducer 导入 json
def main():
bootstrap_servers = 'localhost:34715'
topic = 'dev'
message = {'message': 'Hello from the producer'}
try:
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Successful connection to Kafka server at:", bootstrap_servers)
producer.send(topic, value=message)
print("Message sent successfully to topic:", topic)
except Exception as e:
print("Error sending message:", e)
if __name__ == '__main__':
main()
消费者
from kafka import KafkaConsumer
def main():
consumer = KafkaConsumer('dev', bootstrap_servers='localhost:34715')
try:
for message in consumer:
print(f'Message received: {message.value.decode("utf-8")}')
finally:
consumer.close()
if __name__ == '__main__':
main()
您可以尝试
flush
确保消息立即发送并close
功能吗?
import json
from kafka import KafkaProducer
def main():
bootstrap_servers = 'localhost:34715'
topic = 'dev'
message = {'message': 'Hello from the producer'}
try:
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
print("Successful connection to Kafka server at:", bootstrap_servers)
producer.send(topic, value=message)
print("Message sent successfully to topic:", topic)
# Flush the producer to ensure the message is sent
producer.flush()
except Exception as e:
print("Error sending message:", e)
finally:
# Close the producer
producer.close()
if __name__ == '__main__':
main()