我一直在为Spring Cloud Data Flow在Scala中创建一个简单的自定义处理器,但在向启动应用程序发送接收数据时遇到了问题。我无法看到任何通过流传播的消息。流的定义是 time --trigger.time-unit=SECONDS | pass-through-log | log
哪儿 pass-through-log
是我的自定义处理器。
我使用的是Spring Cloud Data Flow 2.5.1和Spring Boot 2.2.6。
以下是用于处理器的代码--我使用的是功能模型。
@SpringBootApplication
class PassThroughLog {
@Bean
def passthroughlog(): Function[String, String] = {
input: String => {
println(s"Received input `$input`")
input
}
}
}
object PassThroughLog {
def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}
应用程序.yml
spring:
cloud:
stream:
function:
bindings:
passthroughlog-in-0: input
passthroughlog-out-0: output
build.gradle.kts
// scala
implementation("org.scala-lang:scala-library:2.12.10")
// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")
我已将整个项目发布到 github 如果这里的代码样本不足。我也把日志发到了那里,因为日志比较长。
当我启动一个本地Kafka集群,并将任意数据推送给 input
题,我能够看到数据流经处理器。然而,当我在Spring Cloud Data Flow上部署应用程序时,情况并非如此。我是通过Kubernetes中的Docker部署应用程序的。
此外,当我部署一个定义为 time --trigger.time-unit=SECONDS | log
,我在日志汇中看到了信息。这让我确信问题出在自定义处理器上。
我是否遗漏了一些简单的东西,比如依赖关系或额外的配置?任何帮助都是非常感激的。
当在SCDF中使用Spring Cloud Stream 3.x版本时,您必须设置一个额外的属性来让SCDF知道哪些通道绑定被配置为输入和输出通道。
请看。功能性应用
特别注意以下属性。
app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=输出。
app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input。
在您的情况下,您将不得不映射 passthroughlog-in-0
和 passthroughlog-out-0
函数绑定到 input
和 output
分别。
原来问题出在我的Dockerfile上。为了方便配置,我在构建参数中指定了在Docker文件中使用的jar文件。ENTRYPOINT
. 为了达到这个目的,我使用了shell版的 ENTRYPOINT
. 改变我的 ENTRYPOINT
到exec版本,解决了我的问题。
shell版本的 ENTRYPOINT
不能很好地使用图像参数 (docker run <image> <args>
),因此SCDF无法将适当的参数传递给容器。
将我的Docker文件从:
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR
改为
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]
解决了这个问题。