在我的项目中,我需要连接到两个不同的 Kafka 代理。
我的
application.yaml
看起来有点像这样:
spring:
cloud:
function:
definition: orderCreatedListener;orderProcessedListener
stream:
bindings:
orderCreatedProducer-out-0:
destination: order-created
binder: kafka-one
orderCreatedListener-in-0:
destination: order-created
group: spot
binder: kafka-one
orderCreatedListener-out-0:
destination: order-processed
binder: kafka-two
orderProcessedListener-in-0:
destination: order-processed
group: spot
binder: kafka-two
kafka:
binder:
auto-create-topics: true
bindings:
orderCreatedListener-in-0:
consumer:
enableDlq: true
dlqName: order-created-dlq
autoCommitOnError: true
autoCommitOffset: true
orderProcessedListener-in-0:
consumer:
enableDlq: true
dlqName: order-processed-dlq
autoCommitOnError: true
autoCommitOffset: true
binders:
kafka-one:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
kafka-two:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
但是当我运行应用程序时它不起作用,这导致了以下错误:
2024-03-05T23:35:48.473-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00 INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00 INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata
我想将 Kafka 主题分成两个集群:
order-created
和 order-created-dlq
order-processed
和 order-processed-dlq
我使用:
我有两个 Kafka 集群在带有 docker 容器的开发环境中运行良好,一个暴露在 9092 端口上,另一个暴露在 9093 端口上。
如何调整?
我认为您在here定义的 Kafka 设置存在一些问题。相反,我使用了 Spring Cloud Stream 为测试需求提供的指定内容。该给出了一个 3 节点集群,其中包含
localhost:9091
、localhost:9092
和 localhost:9093
。您可以在 9092
分支的应用程序中使用在 9093
和 feature/multiple-kafka-brokers
上运行的程序。通过将这些节点用作 Kafka 代理,我在运行应用程序时没有看到任何错误。运行自述文件中的 CURL
命令时,我在控制台中看到以下输出:
2024-03-07T15:20:55.577-05:00 INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener : {"order_id":"89c6ea7a-6fe5-4fa9-91c6-733a5f603b10","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":1000.00,"status":"REJECTED"}
2024-03-07T15:22:25.885-05:00 INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener : {"order_id":"cf22490f-6d4c-477a-99ba-2088cee07804","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":400.00,"status":"APPROVED"}
因此,这告诉我
application.yaml
中的代码或配置没有任何问题;相反,它表明 Kafka 代理本身存在一些配置/连接问题,可能是因为您在 docker-compose 脚本中设置它们的方式所致。我建议开始查看那里,看看是否发现任何问题。