我已经尝试使用一些不同的库来构建通过TCP和Netty.io交付市场数据的服务器。本质上,我已经构建了一个小的POC,并且在高注入速率下遇到一些性能问题,例如64K TPS。它在32K TPS时似乎可以正常工作,但从悬崖上掉下来并有效地停在64K TPS之上。
以下时间序列图显示了在64K TPS时服务器的输入和输出消息。如您所见,在高注入速率下,它无处不在。
这里是32K TPS的类似图表
代码非常简单。我消耗了对客户的请求,将其提交给市场数据服务,该服务通过我拥有的RXJava管道将消息流式传输。该消息被编码为二进制响应,并通过TCP发送回客户端。我正在尝试找出我需要开始寻找的地方。我确实知道瓶颈在于编码方面,因为开始时进入服务器的消息数量很多。每当收到消息时,我都会刷新Channel,所以我不确定是否是引起该消息的原因。任何指导表示赞赏。
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(parentGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childOption(ChannelOption.SO_SNDBUF, 16777216)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(16777216,33554432))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//Build Pipeline to decode, process messages and encode response
socketChannel.pipeline()
.addLast(
new FlushConsolidationHandler(1000, true),
createMessageFrameBuilder(),
new InboundMessageDecoder<>(),
new MarketDataReqResponseHandler<>(),
new MessageEncoder()
);
}
});
您很可能应该分批处理冲洗,因为flush()
昂贵。您可以做的一件事是将FlushConsolidationHandler
添加到管道中。这将确保在安全可能的情况下将刷新合并在一起,从而最大程度地减少系统调用。