这是我的问题How to implement simple echo socket service in Spring Integration DSL的变种。引入了良好的工作解决方案,但我想探索替代方案。特别是我对基于在客户端和服务器实现中明确使用入站和出站通道的解决方案感兴趣。那可能吗?
到目前为止,我能够提出:
Heartbeat ClientConfig
...
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(outboundChannel)
.handle(Tcp.outboundGateway(clientConnectionFactory))
.channel(inboundChannel)
.get();
}
...
心跳客户端
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in scheduled intervals in loop
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
}
客户端部分似乎工作正常。问题出在服务器端。
HeartbeatServer
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
...
void run() {
// ..in some kind of loop
Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
...
outboudChannel.send(new GenericMessage<>("OK"));
...
}
HeartbeatServerConfig 这是最棘手的部分,我确信我错了。我只是不知道该怎么做。在这里,我天真地使用来自客户端实现的逆向方法,它似乎在起作用;在Flow定义中切换入站和出站通道的意义上的反转。
...
@Bean
public IntegrationFlow heartbeatServerFlow(
MessageChannel outboundChannel,
PollableChannel inboundChannel) {
return IntegrationFlows
.from(inboundChannel)
.handle(Tcp.inboundGateway(Tcp.netServer(7777)))
.channel(outboundChannel)
.get();
}
...
服务器不起作用,抛出关于Found ambiguous parameter type [class java.lang.Boolean] for method match ...
的神秘异常,然后是一长串的Spring和Spring Integration方法。
您无法使用频道启动服务器端流程。
流程从网关开始;它处理所有套接字通信。何时收到消息,它将其发送到通道。
你可以这样做......
@Bean
public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
.replyChannel(replies))
.transform(Transformers.objectToString())
.channel(requests)
.get();
}
但我会问为什么你会想要,因为现在你必须管理你自己的线程从请求通道接收并写入回复通道。为了使其起作用,必须将请求消息中的replyChannel
头复制到回复消息中。事实上,你真的不需要回复频道;您可以直接将回复发送到replyChannel
标头(这就是内部发生的情况,我们将回复通道桥接到标头通道)。
在网关的线程上处理请求要简单得多。
只是为了补充Gary的完美答案,如果有人有兴趣,这里是完整的代码。
我必须明确指定TcpNetServerConnectionFactory
,将ByteArrayLengthHeaderSerializer
设置为序列化器/解串器。没有它它没有用。
Heartbeat ServerConfig qazxsw poi
full code
HeartbeatServer @Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
connectionFactory.setSerializer(new ByteArrayLengthHeaderSerializer());
connectionFactory.setDeserializer(new ByteArrayLengthHeaderSerializer());
return connectionFactory;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpNetServerConnectionFactory connectionFactory,
PollableChannel inboundChannel,
MessageChannel outboundChannel) {
return IntegrationFlows.from(Tcp.inboundGateway(connectionFactory)
.replyChannel(outboundChannel))
.channel(inboundChannel)
.get();
}
full code
关键的一点当然是从请求消息本身获取出站信道:public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> request = inboundChannel.receive();
if (request == null) {
log.error("Heartbeat timeouted");
} else {
MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel();
String requestPayload = new String((byte[]) request.getPayload());
if (requestPayload.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + requestPayload);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
完整代码可以在这里找到。