我有一个Spring Cloud Data Flow (SCDF)
服务器在Kubernetes cluster
上运行,Kafka
作为消息代理。现在我正在尝试发布一个写在Spring Cloud Task (SCT)
主题的Kafka
。我希望SCT使用与Kafka
正在使用的相同的SCDF
。这提出了我的两个问题,希望能够得到答案:
- 如何配置SCT使用与SCDF相同的Kafka?
- 是否可以配置SCT以便Kafka服务器uri在启动时可以自动传递给SCT,类似于在启动时传递给SCT的数据源属性?
由于我找不到任何关于如何实现这一点的例子,非常感谢帮助。
编辑:我自己的答案
这就是我如何让它适用于我的情况。我的SCT要求提供spring.kafka.bootstrap-servers
。从SCDF的shell,我提供它作为参数--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT}
,其中KAFKA_SERVICE_HOST
和KAFKA_SERVICE_PORT
是由SCDF的k8s设置脚本创建的环境变量。
这是在SCDF的shell中启动任务的方法
dataflow:>task launch --name sample-task --arguments "--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT}"
您可能需要查看参考指南中的Spring Cloud Task Events
部分。
期望您选择选择的活页夹并在Task应用程序的类路径中打包该库。有了这种依赖关系,您就可以使用Spring Cloud Stream的Kafka binder properties配置应用程序,例如spring.cloud.stream.kafka.binder.brokers
以及与连接到现有Kafka集群相关的其他应用程序。
使用这些配置启动任务应用程序(来自SCDF)后,您就可以在任务应用程序中发布或接收事件。
或者,使用Task应用程序的类路径中的Kafka-binder,您可以将Kafka binder属性定义为SCDF通过全局配置启动的所有Task。参见参考文献中的Common Application Properties
。指南了解更多信息。在此模型中,您不必显式配置每个Task应用程序与Kafka属性,而是在启动任务时SCDF会自动传播它们。请记住,这些属性将提供给所有Task启动。