.NET 测试容器中的 Kafka + Schema 注册表:无法解析 kafka 主机

问题描述 投票:0回答:1

我正在尝试通过 TestContainers 设置 Kafka 和 Schema 注册表。

我起草了一个设置,可以成功地启动两个容器,但很难从容器外部进行消费。

特别是,我正在尝试连接 kafka 管理客户端。一些管理命令(参见下面的 CALL 1)似乎可以正常执行。下面的其他(请参阅调用 2)失败,这看起来像是可能的错误:

[thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name of service not known (after 12ms in state CONNECT)
[thrd:app]: rdkafka#producer-1: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 12ms in state CONNECT))

我目前不知道为什么会发生这种情况。谁能发现我的配置有问题吗?

我当前的设置如下:


  [Test]
   public async Test my_setup()
   {
      var network = new NetworkBuilder().WithName(Guid.NewGuid().ToString("D")).Build();
      await network.CreateAsync();

      var kafkaContainer = BuildKafkaContainer(network);
      await _kafkaContainer.StartAsync();

      var schemaRegistryContainer = BuildSchemaRegistryContainer(network);
      await _schemaRegistryContainer.StartAsync();

      var kafkaAdminClient = new AdminClientBuilder(new AdminClientConfig {
        BootstrapServers = kafkaContainer.GetBootstrapAddress(),
        SecurityProtocol = SecurityProtocol.Plaintext,
      }).Build();

      // CALL 1: This call works fine and returns empty data as expected
      var meta = kafkaAdminClient.GetMetadata(TimeSpan.FromSeconds(10));

      // CALL 2: This call does eventually times out, with the likely cause in the debug output:
      //
      // [thrd:kafka:9092/1]: kafka:9092/1: Failed to resolve 'kafka:9092': Name of service not known (after 12ms in state CONNECT)"
      // [thrd:app]: rdkafka#producer-1: kafka:9092/1: Failed to resolve 'kafka:9092': Name or service not known (after 12ms in state CONNECT))
      // .. messages repeating ..
      await KafkaAdminClient.CreateTopicsAsync(new[] { new TopicSpecification
      {
        Name = KafkaTopicName,
        ReplicationFactor = 1,
        NumPartitions = 1,
      }}, new CreateTopicsOptions
      {
        OperationTimeout = TimeSpan.FromSeconds(60),
        RequestTimeout = TimeSpan.FromSeconds(60)
      });
   }

 private KafkaContainer BuildKafkaContainer(INetwork network)
    {
        return new KafkaBuilder()
            .WithImage("confluentinc/cp-kafka:7.5.2")
            .WithNetworkAliases("kafka")
            .WithNetwork(network)
            .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://kafka:{KafkaBuilder.KafkaPort},BROKER://0.0.0.0:{KafkaBuilder.BrokerPort}")
            .WithStartupCallback((container, ct) =>
            {
                // Same as the default WithStartupCallback implementation all the way up until...
                const char lf = '\n';
                var startupScript = new StringBuilder();
                startupScript.Append("#!/bin/bash");
                startupScript.Append(lf);
                startupScript.Append($"echo 'clientPort={KafkaBuilder.ZookeeperPort}' > zookeeper.properties");
                startupScript.Append(lf);
                startupScript.Append("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
                startupScript.Append(lf);
                startupScript.Append("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
                startupScript.Append(lf);
                startupScript.Append("zookeeper-server-start zookeeper.properties &");
                startupScript.Append(lf);

                // ... all the way up until here: We override KAFKA_ADVERTISED_LISTENERS
                startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:{KafkaBuilder.KafkaPort},BROKER://{container.IpAddress}:{KafkaBuilder.BrokerPort}");

                // ... and back to copy & pasting the default implementation
                startupScript.Append(lf);
                startupScript.Append("echo '' > /etc/confluent/docker/ensure");
                startupScript.Append(lf);
                startupScript.Append("/etc/confluent/docker/run");
                return container.CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), KafkaBuilder.StartupScriptFilePath, Unix.FileMode755, ct);
            })
            .Build();
    }

    private IContainer BuildSchemaRegistryContainer(INetwork network)
    {
        return new ContainerBuilder()
            .WithImage("confluentinc/cp-schema-registry:7.5.2")
            .WithNetwork(network)
            .WithExposedPort(8081)
            .WithPortBinding(8081, true)
            .WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
            .WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081")
            .WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", $"PLAINTEXT://kafka:{KafkaBuilder.KafkaPort}")
            .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("Server started, listening for requests..."))
            .Build();
    }
c# docker apache-kafka confluent-schema-registry testcontainers
1个回答
0
投票

如果将来有人遇到同样的问题:我已经设法通过有效地保持默认配置不变来“解决”它,但添加架构注册表可以连接到的额外的第三个侦听器:

new KafkaBuilder()
  // ...
  .WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaBuilder.KafkaPort},BROKER://0.0.0.0:{KafkaBuilder.BrokerPort},PLAINTEXT_EXTERNAL://kafka:9876")
  .WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_EXTERNAL:PLAINTEXT")
  .WithStartupCallback((container, ct) =>
  {
    // ...
    startupScript.Append($"export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{container.Hostname}:{container.GetMappedPublicPort(KafkaBuilder.KafkaPort)},BROKER://{container.Hostname}:{KafkaBuilder.BrokerPort},PLAINTEXT_EXTERNAL://kafka:9876");
    // ...
  })

但是,我将保留这个问题,因为我仍然不确定为什么最初的方法不起作用

© www.soinside.com 2019 - 2024. All rights reserved.