使用 Kafka 客户端运行最基本的测试会导致错误
NoBrokersAvailable
。
我的 Github 工作流程设置:
test:
runs-on: ubuntu-latest
services:
zoo_keeper:
image: bitnami/zookeeper
ports:
- 2181:2181
env:
ALLOW_ANONYMOUS_LOGIN: yes
options: >-
--health-cmd "echo mntr | nc -w 2 -q 2 localhost 2181"
--health-interval 10s
--health-timeout 5s
--health-retries 5
kafka:
image: bitnami/kafka
ports:
- 9092:9092
options: >-
--health-cmd "kafka-broker-api-versions.sh --version"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
KAFKA_CFG_KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_LISTENERS: "CLIENT://:9092,INTERNAL://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "CLIENT://localhost:9092,INTERNAL://kafka:9093"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CLIENT:PLAINTEXT,INTERNAL:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
...
steps:
- name: test
...
run: pytest
在本地(在
docker-windows
上)运行相同的设置并在本地(而不是从容器)运行测试可以正常工作并成功连接到 Kafka。
我的测试使用
KAFKA_URL=localhost:9092
并在 Github 运行器主机(而不是容器)上运行。
在另一个测试中,我成功连接到 postgres
(使用 localhost 作为主机名)。
我确信这是一些配置错误,我错过了什么?
您的设置不起作用的主要原因可能是因为您的 Zookeeper 服务名为
zoo_keeper
,带有下划线,并且您配置 kafka 连接到 zookeeper:2181
。
如果我将您的 Zookeeper 服务的名称更改为
zookeeper
并为 kafka 服务使用以下环境变量,它对我有用:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
您可以使用以下自定义 GitHub Action,它将在 GitHub Actions 管道中创建 Kafka (KRaft) Broker。
然后您可以使用以下侦听器之一连接到它:
localhost:9092
$kafka_runner_address:9093
(kafka_runner_address是由上述自定义github操作创建的环境变量)。只需更新您的 GitHub 工作流程以包含以下内容(您可以指定要使用的 kafka 版本 [>3.3] 以及您想要创建的主题/分区):
- name: Run Kafka KRaft Broker
uses: spicyparrot/[email protected]
with:
kafka-version: "3.6.1"
kafka-topics: "foo,1,bar,3"
- name: 🧪 Run PyTests
run: pytest
然后在你的 pytests 中,只需将
bootstrap.servers
配置为指向 localhost:9092
或 $kafka_runner_address:9093
,例如:
import os
from confluent_kafka import Producer
kafka_endpoint = os.getenv("kafka_runner_address")
producer_config = {
'bootstrap.servers': (kafka_endpoint + ':9093') if kafka_endpoint else 'localhost:9092'
}
# Instantiate a producer
producer = Producer(producer_config)
希望这有帮助!