如何使用wireTap传递标题?

问题描述 投票:0回答:1

现在我有以下流程:

flow -> flow.channel(some_channel())
                .....
                .gateway(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));

Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}

default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
    MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
    var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
    interceptor.setDiscardChannel(idempotentDiscardChannel());
    return interceptor;
}

这里的问题是:

[anotherFlowMessageHandler结尾,即void,所以anotherFlow不返回任何内容。

我尝试使用以下方法:

 flow -> flow.channel(some_channel())
                    .....
                    .wireTap(anotherFlow, idempotentByHeader(OBJECT_ID_HEADER));

但是编译器由于idempotentByHeader返回类型而抱怨,所以我尝试执行以下操作:

default Consumer<WireTapSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader)).errorChannel(errorChannel());
}

但是WireTapSpec没有建议方法。

如何解决?

P.S。

我能够更改idempotentByHeader的返回类型来进行书写

            .wireTap(anotherFlow)
            .enrich(idempotentByHeader(OBJECT_ID_HEADER));

但是现在应用程序由于以下原因而无法启动:

Caused by: java.lang.IllegalStateException: If the errorChannel is set, then the requestChannel must not be null
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.integration.transformer.ContentEnricher.doInit(ContentEnricher.java:277)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.onInit(AbstractReplyProducingMessageHandler.java:98)
    at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1862)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1799)
    ... 42 common frames omitted
java spring-boot spring-integration spring-integration-dsl
1个回答
0
投票

确定。您缺少WireTapChannel Interceptor的事实。它不是能够接受幂等接收器拦截器的endpoint网关。

我不确定您使用idempotentByHeaderInterceptor的目标是什么,但是标题确实会传递到要发送到该WireTap的消息中。因此,您可以访问订阅此WireTap的子流中的标题。

[您最近的enrich()样本也使我有些困惑。在使用网关之前,您曾尝试避免通过idempotentByHeaderInterceptor将相同的消息发送到该子流,但是现在您无条件地发送到wireTap,并且只有在此之后才应用idempotentByHeaderInterceptor

所以,您的idempotentByHeaderInterceptor的目标是什么,您想在哪里应用?

© www.soinside.com 2019 - 2024. All rights reserved.