SCDF Spring Cloud Stream从2.1.4迁移到引导2.2.4版本中断kafka avro流

问题描述 投票:0回答:1

我们有一个使用Spring Cloud Functions方法的Spring Cloud处理器。当前使用的引导版本是2.1.4,而Greenwich的云版本是[SR1

处理器骨架在下面

@EnableBinding(Processor.class)
public class FilterProcessor {

    @Bean
    public Function<DeviceEvent, DeviceEvent> filter() {
        return deviceEvent -> {
            // process and return data
        };
    }
}

以下是应用程序yml配置

spring:
  cloud:
    stream:
      default:
        content-type: application/*+avro
        producer:
          useNativeEncoding: true
        consumer:
          useNativeEncoding: true
      function:
        definition: filter
      kafka:
        binder:
          producer-properties:
            key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: http://schemaregistry:8081
          consumer-properties:
            key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            specific.avro.reader: true
            schema.registry.url: http://schemaregistry:8081

但是从2.1.4迁移到2.2.4 / Hoxton.SR1后,按照以下建议在另一票证上进行了yml更改

      function:
        definition: filter
        bindings:
          filter-in-0: input
          filter-out-0: output

应用程序和配置上的所有其他详细信息保持不变。还删除了@EnableBinding批注。

但是在向流中的处理器发送消息时,出现以下异常


2020-04-05 23:12:45.370 ERROR 462 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = FTV, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1586108023129, serialized key size = -1, serialized value size = 108, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomJavaObject)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused), failedMessage=GenericMessage [payload={"providerCode": "GTB", "customerId": "123", "type": "ASSET", "emitTime": 1585101533015, "captureTime": 1585101533015, "readTime": 1585101533015, "deviceId": "DeviceId", "data": {"battery_voltage": "13", "fmi": "4", "battery_voltage2": "21", "battery_voltage3": "13"}}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@337330a0, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=FTV, kafka_receivedTimestamp=1586108023129, kafka_groupId=fls}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1745) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1734) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]

Caused by: org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
        at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:751) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:677) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:452) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:69) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:308) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_192]
        at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359) ~[na:1.8.0_192]
        at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_192]
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) ~[na:1.8.0_192]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_192]
        at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) ~[na:1.8.0_192]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.apply(BeanFactoryAwareFunctionRegistry.java:465) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:602) ~[spring-cloud-stream-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_192]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_192]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_192]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_192]
        at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:386) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:636) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:629) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:613) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        ... 34 common frames omitted

错误的更新的堆栈跟踪

任何使处理器处于最新启动状态的指针都会有所帮助,因为我们的自定义库正在升级并使用最新功能,使用旧启动状态的处理器会阻止某些功能正常工作或无法加载,例如,当configproperties使用内部类或使用构造函数绑定。

因此,任何解决问题的指针将不胜感激。重申一下,代码中没有进行任何HTTP调用,处理器只是一个带有log语句的in-out处理器。

流定义如下

:TopicName > Processor | Sink

处理器和接收器都只包含日志语句。处理器只是返回接收到的事件。

spring-cloud-stream spring-cloud-dataflow
1个回答
0
投票

只是为可能面临类似问题的任何人总结。从较低版本升级到较高版本时,我的版本是2.1.4(Greenwich.SR1)到2.2.4(Hoxton.SR1)

添加如下所示的函数绑定,这些绑定是<functionName>-<in for input, out for output>-<param sequence number>

      function:
        definition: filter
        bindings:
          filter-in-0: input
          filter-out-0: output

添加架构注册表客户端配置。仅@EnableSchemaRegistry似乎不起作用

   @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}") String endPoint) {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endPoint);
        return client;
    }
© www.soinside.com 2019 - 2024. All rights reserved.