如何使用QueueChannel和ServiceActivator正确配置TCP inboundAdapter

问题描述 投票:2回答:1

我正在尝试配置一个TCP套接字,以在不同的消息中接收name,value格式的数据。这些消息平均每秒发送一次,有时更快或更慢。

我能够设置一个有效的配置,但是我对Spring Integration中实际发生的事情缺乏基本的了解。

我的配置文件如下所示:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService,
        @Value("${tcp.socket.server.port}") final int port
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                Tcp.nioServer(port)
                   .deserializer(serializer())
                   .leaveOpen(true)
            )
               .autoStartup(true)
               .outputChannel(queueChannel())
        ).transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller()
    {
        return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public MessageChannel queueChannel()
    {
        return MessageChannels.queue("queue", 50).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

[CSVProcessingService看起来像这样(缩写):

@Slf4j
@Service
public class CSVProcessingService
{
    @ServiceActivator
    public void process(final String message)
    {
        log.debug("DATA RECEIVED: \n" + message);
        final CsvMapper csvMapper = new CsvMapper();
        final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);

        if (StringUtils.contains(message, StringUtils.LF))
        {
            processMultiLineInput(message, csvMapper, csvSchema);
        }
        else
        {
            processSingleLineInput(message, csvMapper, csvSchema);
        }
    }
}

我对此配置的目标如下:

  • 在配置的端口上接收消息
  • 承受更高的负载而不会丢失消息
  • 反序列化消息
  • 将它们放入队列通道
  • (最好也是日志错误)
  • 每50毫秒对队列通道进行一次轮询,并将来自队列通道的消息传递到ObjectToStringTransformer
  • 在将转换后的消息传递给转换器之后,将其传递到CSVProcessingService进行进一步处理

我是否正确地实现了所有这些目标,还是因为误解了Spring Integration而犯了一个错误?是否可以以某种方式组合Poller@ServiceActivator

还有,在可视化配置的IntegrationFlow如何实际“流动”时,我遇到了一个问题,也许有人可以帮助我更好地理解这一点。

java spring spring-integration spring-integration-dsl
1个回答
0
投票

不确定是什么让您感到困惑,但是您的配置和逻辑看起来不错。

[您可能会错过一个事实,您不需要在两者之间使用QueueChannel,因为AbstractConnectionFactory.processNioSelections()已经是多线程的,并且它计划一个任务以从套接字读取消息。因此,您只需要为Executor配置一个合适的Tcp.nioServer()。虽然无论如何它都是默认的Executors.newCachedThreadPool()

另一方面,在内存为QueueChannel的情况下,确实可能会丢失消息,因为它们已从网络读取。

当您使用Java DSL时,应考虑在端点上使用poller()选项。如果@Poller属性在那上面,则@ServiceActivator将在inputChannel上起作用,但是在handle()中使用相同的属性会覆盖inputChannel,因此不会应用@Poller。不要混淆Java DSL和注释配置的混合!

其他所有配置都很好。

© www.soinside.com 2019 - 2024. All rights reserved.