我们有一个使用Spring Cloud Functions方法的Spring Cloud处理器。当前使用的引导版本是2.1.4,而Greenwich的云版本是[SR1
处理器骨架在下面
@EnableBinding(Processor.class)
public class FilterProcessor {
@Bean
public Function<DeviceEvent, DeviceEvent> filter() {
return deviceEvent -> {
// process and return data
};
}
}
以下是应用程序yml配置
spring:
cloud:
stream:
default:
content-type: application/*+avro
producer:
useNativeEncoding: true
consumer:
useNativeEncoding: true
function:
definition: filter
kafka:
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://schemaregistry:8081
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: http://schemaregistry:8081
但是从2.1.4迁移到2.2.4 / Hoxton.SR1后,按照以下建议在另一票证上进行了yml更改
function:
definition: filter
bindings:
filter-in-0: input
filter-out-0: output
应用程序和配置上的所有其他详细信息保持不变。还删除了@EnableBinding批注。
但是在向流中的处理器发送消息时,出现以下异常
2020-04-05 23:12:45.370 ERROR 462 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = FTV, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1586108023129, serialized key size = -1, serialized value size = 108, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = CustomJavaObject)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused), failedMessage=GenericMessage [payload={"providerCode": "GTB", "customerId": "123", "type": "ASSET", "emitTime": 1585101533015, "captureTime": 1585101533015, "readTime": 1585101533015, "deviceId": "DeviceId", "data": {"battery_voltage": "13", "fmi": "4", "battery_voltage2": "21", "battery_voltage3": "13"}}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@337330a0, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=FTV, kafka_receivedTimestamp=1586108023129, kafka_groupId=fls}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1745) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1734) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) [spring-kafka-2.3.5.RELEASE.jar!/:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@76ec21a5]; nested exception is org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
Caused by: org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8990": Connection refused (Connection refused); nested exception is java.net.ConnectException: Connection refused (Connection refused)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:751) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:677) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.web.client.RestTemplate.postForEntity(RestTemplate.java:452) ~[spring-web-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:69) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:308) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:125) ~[spring-cloud-stream-schema-2.2.1.RELEASE.jar!/:2.2.1.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:217) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:207) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83) ~[spring-messaging-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:620) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_192]
at java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1359) ~[na:1.8.0_192]
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:498) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:485) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_192]
at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152) ~[na:1.8.0_192]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_192]
at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464) ~[na:1.8.0_192]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:626) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:569) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.apply(BeanFactoryAwareFunctionRegistry.java:465) ~[spring-cloud-function-context-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:602) ~[spring-cloud-stream-3.0.1.RELEASE.jar!/:3.0.1.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_192]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_192]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_192]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_192]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:55) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:386) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:636) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:629) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:613) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
... 34 common frames omitted
错误的更新的堆栈跟踪
任何使处理器处于最新启动状态的指针都会有所帮助,因为我们的自定义库正在升级并使用最新功能,使用旧启动状态的处理器会阻止某些功能正常工作或无法加载,例如,当configproperties使用内部类或使用构造函数绑定。
因此,任何解决问题的指针将不胜感激。重申一下,代码中没有进行任何HTTP调用,处理器只是一个带有log语句的in-out处理器。
流定义如下
:TopicName > Processor | Sink
处理器和接收器都只包含日志语句。处理器只是返回接收到的事件。
只是为可能面临类似问题的任何人总结。从较低版本升级到较高版本时,我的版本是2.1.4(Greenwich.SR1)到2.2.4(Hoxton.SR1)
添加如下所示的函数绑定,这些绑定是<functionName>-<in for input, out for output>-<param sequence number>
function:
definition: filter
bindings:
filter-in-0: input
filter-out-0: output
添加架构注册表客户端配置。仅@EnableSchemaRegistry似乎不起作用
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.kafka.binder.producer-properties.schema.registry.url}") String endPoint) {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endPoint);
return client;
}