我正在尝试通过 TestContainers 设置 Kafka 和 Schema 注册表。
我起草了一个设置,可以成功地启动两个容器,但很难从容器外部进行消费。
特别是,我正在尝试连接 kafka 管理客户端。一些管理命令(参见下面的 CALL 1)似乎可以正常执行。下面的其他(请参阅调用 2)失败,这看起来像是可能的错误:
[thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name of service not known (after 12ms in state CONNECT)
[thrd:app]: rdkafka#producer-1: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 12ms in state CONNECT))
我目前不知道为什么会发生这种情况。谁能发现我的配置有问题吗?
我当前的设置如下:
[Test]
public async Test my_setup()
{
var network = new NetworkBuilder().WithName(Guid.NewGuid().ToString("D")).Build();
await network.CreateAsync();
var kafkaContainer = BuildKafkaContainer(network);
await _kafkaContainer.StartAsync();
var schemaRegistryContainer = BuildSchemaRegistryContainer(network);
await _schemaRegistryContainer.StartAsync();
var kafkaAdminClient = new AdminClientBuilder(new AdminClientConfig {
BootstrapServers = kafkaContainer.GetBootstrapAddress(),
SecurityProtocol = SecurityProtocol.Plaintext,
}).Build();
// CALL 1: This call works fine and returns empty data as expected
var meta = kafkaAdminClient.GetMetadata(TimeSpan.FromSeconds(10));
// CALL 2: This call does eventually times out, with the likely cause in the debug output:
//
// [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name of service not known (after 12ms in state CONNECT)"
// [thrd:app]: rdkafka#producer-1: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 12ms in state CONNECT))
// .. messages repeating ..
await KafkaAdminClient.CreateTopicsAsync(new[] { new TopicSpecification
{
Name = KafkaTopicName,
ReplicationFactor = 1,
NumPartitions = 1,
}}, new CreateTopicsOptions
{
OperationTimeout = TimeSpan.FromSeconds(60),
RequestTimeout = TimeSpan.FromSeconds(60)
});
}
private KafkaContainer BuildKafkaContainer(INetwork network)
{
return new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:7.5.2")
.WithNetworkAliases("kafka")
.WithNetwork(network)
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://kafka:{KafkaBuilder.KafkaPort},BROKER://0.0.0.0:{KafkaBuilder.BrokerPort}")
.WithStartupCallback((container, ct) =>
{
// Same as the default WithStartupCallback implementation all the way up until...
const char lf = '\n';
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
startupScript.Append($"echo 'clientPort={KafkaBuilder.ZookeeperPort}' > zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
// ... all the way up until here: We override KAFKA_ADVERTISED_LISTENERS
startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:{KafkaBuilder.KafkaPort},BROKER://{container.IpAddress}:{KafkaBuilder.BrokerPort}");
// ... and back to copy & pasting the default implementation
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
startupScript.Append("/etc/confluent/docker/run");
return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), KafkaBuilder.StartupScriptFilePath, Unix.FileMode755, ct);
})
.Build();
}
private IContainer BuildSchemaRegistryContainer(INetwork network)
{
return new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:7.5.2")
.WithNetwork(network)
.WithExposedPort(8081)
.WithPortBinding(8081, true)
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"PLAINTEXT://kafka:{KafkaBuilder.KafkaPort}")
.WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("Server started, listening for requests..."))
.Build();
}
如果将来有人遇到同样的问题:我已经设法通过有效地保持默认配置不变来“解决”它,但添加架构注册表可以连接到的额外的第三个侦听器:
new KafkaBuilder()
// ...
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaBuilder.KafkaPort},BROKER://0.0.0.0:{KafkaBuilder.BrokerPort},PLAINTEXT_EXTERNAL://kafka:9876")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_EXTERNAL:PLAINTEXT")
.WithStartupCallback((container, ct) =>
{
// ...
startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaBuilder.KafkaPort)},BROKER://{container.Hostname}:{KafkaBuilder.BrokerPort},PLAINTEXT_EXTERNAL://kafka:9876");
// ...
})
但是,我将保留这个问题,因为我仍然不确定为什么最初的方法不起作用