camel:如何异步发送到端点

问题描述 投票:0回答:7

如何在不等待端点的路由被处理的情况下向端点发送消息(也就是说,我的路由应该只发送消息并完成)?

java apache-camel
7个回答
6
投票

wireTap(endpoint)
就是答案。


6
投票

您需要使用wireTap 或多播。 direct: 端点将修改下一步的 Exchange,无论指定什么 ExchangePattern。您可以通过使用这个失败的测试来看到:

public class StackOverflowTest extends CamelTestSupport {
    private static final String DIRECT_INPUT = "direct:input";
    private static final String DIRECT_NO_RETURN = "direct:no.return";
    private static final String MOCK_OUTPUT = "mock:output";
    private static final String FIRST_STRING = "FIRST";
    private static final String SECOND_STRING = "SECOND";

    @NotNull
    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from(DIRECT_INPUT)
                        .to(ExchangePattern.InOnly, DIRECT_NO_RETURN)
                        .to(MOCK_OUTPUT)
                        .end();

                from(DIRECT_NO_RETURN)
                        .bean(new CreateNewString())
                        .end();
            }
        };
    }

    @Test
    public void testShouldNotModifyMessage() throws JsonProcessingException, InterruptedException {
        final MockEndpoint myMockEndpoint = getMockEndpoint(MOCK_OUTPUT);
        myMockEndpoint.expectedBodiesReceived(FIRST_STRING);
        template.sendBody(DIRECT_INPUT, FIRST_STRING);
        assertMockEndpointsSatisfied();
    }

    public static class CreateNewString {
        @NotNull
        public String handle(@NotNull Object anObject) {
            return SECOND_STRING;
        }
    }
}

现在如果你将上面的内容更改为wireTap:

                from(DIRECT_INPUT)
                    .wireTap(DIRECT_NO_RETURN)
                    .to(MOCK_OUTPUT)
                    .end();

您会看到它按预期工作。您还可以使用多播:

                from(DIRECT_INPUT)
                    .multicast()
                    .to(DIRECT_NO_RETURN)
                    .to(MOCK_OUTPUT)
                    .end();

3
投票

您可以使用 ProducerTemplate 的 asyncSend() 方法将 InOnly 消息发送到端点...

template.asyncSend("direct:myInOnlyEndpoint","myMessage");

请参阅 http://camel.apache.org/async.html 了解更多详细信息


2
投票

这可能取决于您使用的端点等,但一种常见的方法是在两者之间放置一个 seda 端点是一种选择。

from("foo:bar")
  .bean(processingBean)
  .to("seda:asyncProcess") // Async send
  .bean(moreProcessingBean)

from("seda:asyncProcess")
  .to("final:endpoint"); // could be some syncrhonous endpoint that takes time to send to. http://server/heavyProcessingService or what not.

seda 端点的行为就像一个队列,先进先出。如果您将多个事件分派到 seda 端点的速度比路由完成处理它们的速度快,它们将堆积起来并等待处理,这是一个很好的行为。


2
投票

您可以在路由中使用 inOnly 仅将消息发送到端点,而不等待响应。有关更多详细信息,请参阅请求回复文档事件消息文档

from("direct:testInOnly").inOnly("mock:result");

0
投票

https://people.apache.org/~dkulp/camel/async.html

对于 InOnly 和 InOut,您都可以发送同步或异步。看起来很奇怪,你可以发送 InOnly 但异步,但最后这里解释说它等待 Camel 处理,然后触发并忘记。

异步客户端 API Camel 在 ProducerTemplate 中提供了异步客户端 API,我们在 Camel 2.0 中添加了大约 10 个新方法。我们在下表中列出了最重要的内容:

方法 退货 描述
设置ExecutorService 无效 用于设置Java ExecutorService。 Camel 默认情况下会提供一个 ScheduledExecutorService,池中有 5 个线程。
异步发送 未来 用于将异步交换发送到 Camel 端点。在任务提交给执行器服务后,Camel 会立即将控制权返回给调用者线程。这允许您在 Camel 处理其他异步线程中的交换时执行其他工作。
asyncSendBody 未来 同上,但仅用于发送正文。这是仅请求消息传递风格,因此预计不会回复。使用 InOnly 交换模式。
asyncRequestBody 未来 同上,但仅用于发送正文。这是请求回复消息传递风格,因此需要回复。使用 InOut 交换模式。
提取FutureBody T 用于使用 Java Concurrency Future 句柄从异步线程获取结果。

带有回调的异步客户端 API 除了上面的客户端 API 之外,Camel 还提供了一种在消息交换完成时使用回调的变体。

方法 退货 描述
异步回调 未来 此外,使用 org.apache.camel.spi.Synchronization Callback 将回调作为参数传入。消息交换完成后将调用回调。
asyncCallbackSendBody 未来 同上,但仅用于发送正文。这是仅请求消息传递风格,因此预计不会回复。使用 InOnly 交换模式。
asyncCallbackRequestBody 未来 同上,但仅用于发送正文。这是请求回复消息传递风格,因此需要回复。使用 InOut 交换模式。

这些方法还会返回 Future 句柄,以防您需要它们。不同之处在于,当 Exchange 完成路由时,它们也会调用回调。

未来的API java.util.concurrent.Future API 具有以下方法:

方法 退货 描述
完成 布尔值 返回一个布尔值,表示任务是否完成。如果任务由于抛出异常而失败,甚至会返回 true。
获取() 物体 获取任务的响应。如果抛出异常,则会抛出 java.util.concurrent.ExecutionException 以及引起的异常。

0
投票

您可以使用.wireTap

from("direct:start")
.wireTap("route.to.someWhere");
© www.soinside.com 2019 - 2024. All rights reserved.