我正在尝试配置一个TCP套接字,以在不同的消息中接收name,value
格式的数据。这些消息平均每秒发送一次,有时更快或更慢。
我能够设置一个有效的配置,但是我对Spring Integration中实际发生的事情缺乏基本的了解。
我的配置文件如下所示:
@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
@Bean
public IntegrationFlow server(
final CSVProcessingService csvProcessingService,
@Value("${tcp.socket.server.port}") final int port
)
{
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.nioServer(port)
.deserializer(serializer())
.leaveOpen(true)
)
.autoStartup(true)
.outputChannel(queueChannel())
).transform(new ObjectToStringTransformer())
.handle(csvProcessingService)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller()
{
return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
}
@Bean
public MessageChannel queueChannel()
{
return MessageChannels.queue("queue", 50).get();
}
@Bean
public ByteArrayLfSerializer serializer()
{
final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();
serializer.setMaxMessageSize(10240);
return serializer;
}
}
[CSVProcessingService
看起来像这样(缩写):
@Slf4j
@Service
public class CSVProcessingService
{
@ServiceActivator
public void process(final String message)
{
log.debug("DATA RECEIVED: \n" + message);
final CsvMapper csvMapper = new CsvMapper();
final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);
if (StringUtils.contains(message, StringUtils.LF))
{
processMultiLineInput(message, csvMapper, csvSchema);
}
else
{
processSingleLineInput(message, csvMapper, csvSchema);
}
}
}
我对此配置的目标如下:
ObjectToStringTransformer
CSVProcessingService
进行进一步处理我是否正确地实现了所有这些目标,还是因为误解了Spring Integration而犯了一个错误?是否可以以某种方式组合Poller
和@ServiceActivator
?
还有,在可视化配置的IntegrationFlow如何实际“流动”时,我遇到了一个问题,也许有人可以帮助我更好地理解这一点。
不确定是什么让您感到困惑,但是您的配置和逻辑看起来不错。
[您可能会错过一个事实,您不需要在两者之间使用QueueChannel
,因为AbstractConnectionFactory.processNioSelections()
已经是多线程的,并且它计划一个任务以从套接字读取消息。因此,您只需要为Executor
配置一个合适的Tcp.nioServer()
。虽然无论如何它都是默认的Executors.newCachedThreadPool()
。
另一方面,在内存为QueueChannel
的情况下,确实可能会丢失消息,因为它们已从网络读取。
当您使用Java DSL时,应考虑在端点上使用poller()
选项。如果@Poller
属性在那上面,则@ServiceActivator
将在inputChannel
上起作用,但是在handle()
中使用相同的属性会覆盖inputChannel
,因此不会应用@Poller
。不要混淆Java DSL和注释配置的混合!
其他所有配置都很好。