Spring Cloud Stream 多个 Kafka 集群配置

问题描述 投票:0回答:1

在我的项目中,我需要连接到两个不同的 Kafka 代理。

我的

application.yaml
看起来有点像这样:

spring:
  cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two
        orderProcessedListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093

但是当我运行应用程序时它不起作用,这导致了以下错误:

2024-03-05T23:35:48.473-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00  INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata

我想将 Kafka 主题分成两个集群:

  • kafka-包含
    order-created
    order-created-dlq
  • kafka-two 包含
    order-processed
    order-processed-dlq

我使用:

  • Spring Boot 3.2.3
  • 春云2023.0.0

我有两个 Kafka 集群在带有 docker 容器的开发环境中运行良好,一个暴露在 9092 端口上,另一个暴露在 9093 端口上。

如何调整?

spring spring-cloud spring-cloud-stream spring-cloud-stream-binder-kafka spring-cloud-stream-binder
1个回答
0
投票

我认为您在here定义的 Kafka 设置存在一些问题。相反,我使用了 Spring Cloud Stream 为测试需求提供的指定内容。该给出了一个 3 节点集群,其中包含

localhost:9091
localhost:9092
localhost:9093
。您可以在
9092
分支的应用程序中使用在
9093
feature/multiple-kafka-brokers
上运行的程序。通过将这些节点用作 Kafka 代理,我在运行应用程序时没有看到任何错误。运行自述文件中的
CURL
命令时,我在控制台中看到以下输出:

2024-03-07T15:20:55.577-05:00  INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener     : {"order_id":"89c6ea7a-6fe5-4fa9-91c6-733a5f603b10","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":1000.00,"status":"REJECTED"}
2024-03-07T15:22:25.885-05:00  INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener     : {"order_id":"cf22490f-6d4c-477a-99ba-2088cee07804","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":400.00,"status":"APPROVED"}

因此,这告诉我

application.yaml
中的代码或配置没有任何问题;相反,它表明 Kafka 代理本身存在一些配置/连接问题,可能是因为您在 docker-compose 脚本中设置它们的方式所致。我建议开始查看那里,看看是否发现任何问题。

© www.soinside.com 2019 - 2024. All rights reserved.