目前我正在使用 EKS/Kubernetes 集群,并将 Apache kafka 安装为服务。
在集群内部,我可以使用以下命令执行所有 kafka 操作,例如创建、删除、使用或读取主题:
#List topics
kubectl exec -it <My pod> -- kafka-topics.sh --list --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
#Create topic
kubectl exec -it <My pod> -- kafka-topics.sh --create --topic test --partitions 3 --replication-factor 1 --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
#Delete topic
kubectl exec -it <My pod> -- kafka-topics.sh --delete --topic test --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
#Produce
kubectl exec -it <My pod> -- kafka-console-producer.sh --topic test --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092 < value.json
#Consume
kubectl exec -it <My pod> -- kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
现在我想用 lambda 或一些 python 代码来摄取数据而不进入集群,如
from confluent_kafka.admin import AdminClient, NewTopic
def list_kafka_topics(bootstrap_servers):
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
topics = admin_client.list_topics().topics
topic_names = list(topics.keys())
return topic_names
bootstrap_servers = 'BOOTSTRAPS'
topics = list_kafka_topics(bootstrap_servers)
print("Lista de Topics en Kafka:")
for topic in topics:
print(topic)
总是出现的错误是:
Failed to resolve 'kafka:9092': No such host is known. (after 2703ms in state CONNECT, 1 identical error(s) suppressed)
有人可以帮助我将数据提取到 EKS 中已安装的 kafka 集群吗?或者总是以获取数据为目的制定新策略。
如果我理解正确的话,您的 Kafka 服务正在 Pod 内运行。
要从集群外部访问 Kubernetes Pod(即服务),您通常会使用 Kubernetes
Ingress
或 LoadBalancer
将服务暴露给外部世界。这允许外部流量到达您的服务。
然后使用 Lambda 内的入口端点。