配置Spring Cloud Task以使用Spring Cloud Data Flow服务器的Kafa

问题描述 投票:0回答:1

我有一个Spring Cloud Data Flow (SCDF)服务器在Kubernetes cluster上运行,Kafka作为消息代理。现在我正在尝试发布一个写在Spring Cloud Task (SCT)主题的Kafka。我希望SCT使用与Kafka正在使用的相同的SCDF。这提出了我的两个问题,希望能够得到答案:

  1. 如何配置SCT使用与SCDF相同的Kafka?
  2. 是否可以配置SCT以便Kafka服务器uri在启动时可以自动传递给SCT,类似于在启动时传递给SCT的数据源属性?

由于我找不到任何关于如何实现这一点的例子,非常感谢帮助。

编辑:我自己的答案

这就是我如何让它适用于我的情况。我的SCT要求提供spring.kafka.bootstrap-servers。从SCDF的shell,我提供它作为参数--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT},其中KAFKA_SERVICE_HOSTKAFKA_SERVICE_PORT是由SCDF的k8s设置脚本创建的环境变量。

这是在SCDF的shell中启动任务的方法

dataflow:>task launch --name sample-task --arguments "--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT}"
spring-cloud-dataflow spring-cloud-task
1个回答
0
投票

您可能需要查看参考指南中的Spring Cloud Task Events部分。

期望您选择选择的活页夹并在Task应用程序的类路径中打包该库。有了这种依赖关系,您就可以使用Spring Cloud Stream的Kafka binder properties配置应用程序,例如spring.cloud.stream.kafka.binder.brokers以及与连接到现有Kafka集群相关的其他应用程序。

使用这些配置启动任务应用程序(来自SCDF)后,您就可以在任务应用程序中发布或接收事件。

或者,使用Task应用程序的类路径中的Kafka-binder,您可以将Kafka binder属性定义为SCDF通过全局配置启动的所有Task。参见参考文献中的Common Application Properties。指南了解更多信息。在此模型中,您不必显式配置每个Task应用程序与Kafka属性,而是在启动任务时SCDF会自动传播它们。请记住,这些属性将提供给所有Task启动。

© www.soinside.com 2019 - 2024. All rights reserved.