Spring Cloud Data Flow自定义Scala处理器无法从启动应用程序发送接收数据(SCDF 2.5.1 & Spring Boot 2.2.6)。

问题描述 投票:0回答:1

我一直在为Spring Cloud Data Flow在Scala中创建一个简单的自定义处理器,但在向启动应用程序发送接收数据时遇到了问题。我无法看到任何通过流传播的消息。流的定义是 time --trigger.time-unit=SECONDS | pass-through-log | log 哪儿 pass-through-log 是我的自定义处理器。

我使用的是Spring Cloud Data Flow 2.5.1和Spring Boot 2.2.6。

以下是用于处理器的代码--我使用的是功能模型。

@SpringBootApplication
class PassThroughLog {

  @Bean
  def passthroughlog(): Function[String, String] = {
    input: String => {
      println(s"Received input `$input`")
      input
    }
  }
}

object PassThroughLog {
  def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}

应用程序.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          passthroughlog-in-0: input
          passthroughlog-out-0: output

build.gradle.kts

// scala
implementation("org.scala-lang:scala-library:2.12.10")

// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")

我已将整个项目发布到 github 如果这里的代码样本不足。我也把日志发到了那里,因为日志比较长。

当我启动一个本地Kafka集群,并将任意数据推送给 input 题,我能够看到数据流经处理器。然而,当我在Spring Cloud Data Flow上部署应用程序时,情况并非如此。我是通过Kubernetes中的Docker部署应用程序的。

此外,当我部署一个定义为 time --trigger.time-unit=SECONDS | log,我在日志汇中看到了信息。这让我确信问题出在自定义处理器上。

我是否遗漏了一些简单的东西,比如依赖关系或额外的配置?任何帮助都是非常感激的。

scala spring-boot spring-cloud-stream spring-cloud-dataflow spring-cloud-stream-binder-kafka
1个回答
0
投票

当在SCDF中使用Spring Cloud Stream 3.x版本时,您必须设置一个额外的属性来让SCDF知道哪些通道绑定被配置为输入和输出通道。

请看。功能性应用

特别注意以下属性。

app.time-source.spring.cloud.stream.function.bindings.timeSupplier-out-0=输出。

app.log-sink.spring.cloud.stream.function.bindings.logConsumer-in-0=input。

在您的情况下,您将不得不映射 passthroughlog-in-0passthroughlog-out-0 函数绑定到 inputoutput 分别。


0
投票

原来问题出在我的Dockerfile上。为了方便配置,我在构建参数中指定了在Docker文件中使用的jar文件。ENTRYPOINT. 为了达到这个目的,我使用了shell版的 ENTRYPOINT. 改变我的 ENTRYPOINT 到exec版本,解决了我的问题。

shell版本的 ENTRYPOINT 不能很好地使用图像参数 (docker run <image> <args>),因此SCDF无法将适当的参数传递给容器。

将我的Docker文件从:

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR

改为

FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]

解决了这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.