我有下面的JAVA实现来使用NIO API创建一个简单的Web服务器。
package zion
import java.net._
import java.nio.ByteBuffer
import java.nio.channels._
object NHello {
import java.nio.CharBuffer
import java.nio.charset.Charset
def helloWorldBytes: ByteBuffer = Charset
.forName("ISO-8859-1")
.newEncoder
.encode(CharBuffer.wrap(httpResponse("NHello World\n")))
def httpResponse(content: String): String = {
val rn = "\r\n"
List(
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: Keep-Alive",
s"Content-Length: ${content.length()}",
rn + content
).mkString(rn)
}
def main(args: Array[String]): Unit = {
val port = 8080
val address = new InetSocketAddress(port)
// Server Socket Channel
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(address)
serverSocketChannel.configureBlocking(false)
// Selector
val selector = Selector.open()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (true) {
selector.select()
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext) {
val key = iterator.next()
if (key.isAcceptable) {
val channel = serverSocketChannel.accept()
channel.write(helloWorldBytes)
channel.close()
}
}
iterator.remove()
}
sys.addShutdownHook({
println("Shutting down...")
serverSocketChannel.close()
})
println("Exiting...")
}
}
使用 wrk
我的请求量大约是每秒几千次。
wrk -t12 -c100 -d10s http://127.0.0.1:8080
与Netty相比,这似乎有点太慢了。而使用Netty,我的吞吐量至少能提高10~15倍。考虑到Netty也是建立在NIO之上,我到底做错了什么?
是否有一些明显的性能优化是我遗漏的?
经过进一步的搜索和分析,我终于弄清楚了上面这段代码中的所有问题。
def main(args: Array[String]): Unit = {
val port = 8080
val address = new InetSocketAddress(port)
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(address)
serverSocketChannel.configureBlocking(false)
val selector = Selector.open()
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
while (true) {
selector.select()
val iterator = selector.selectedKeys().iterator()
while (iterator.hasNext) {
val key = iterator.next()
if (key.isAcceptable) {
val channel = serverSocketChannel.accept()
// 1. Blocking Write
channel.write(helloWorldBytes)
// 2. Blocking Close
channel.close()
}
}
iterator.remove()
}
sys.addShutdownHook({
println("Shutting down...")
serverSocketChannel.close()
})
println("Exiting...")
}
}
主要的问题是
1. 阻止写由于阻塞写调用,除非字节写入流中,否则我无法接受更多的连接。所以这些连接就这样闲置着,从而影响了webserver的性能。
2. 阻止接近该 close
呼叫也是阻塞的,需要时间来完成。同样除非关闭连接,否则不接受新的请求,也不响应已接受的连接。
关闭连接还有一个问题。创建一个新的连接是很昂贵的 wrk
等,不要在发出一个请求后自动杀死连接。在每次请求后关闭服务器上的连接也会成为性能杀手,从而影响你的基准。
这里有一个另类的 "高性能 "实现方式
package zion
import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
AsynchronousChannelGroup,
AsynchronousServerSocketChannel,
AsynchronousSocketChannel,
CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}
/**
* This is potentially as fast as it can get using NIO APIs.
*/
object HelloAsyncNIO {
// Create a thread pool for the socket channel
// It would be better to have probably only one thread for events.
// That pool could be shared betwee the SocketServer and in future SocketClients.
private val group =
AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))
// Socket to accept connections
private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)
// Port to be used to connect
private val PORT = 8081
// Flag to handle logging
private val ENABLE_LOGGING = false
/**
* Contains utilities to manage read/write on the socket channels
*/
object NIOBuffer {
def helloWorldBytes: ByteBuffer = Charset
.forName("ISO-8859-1")
.newEncoder
.encode(CharBuffer.wrap(httpResponse("NHello World\n")))
def httpResponse(content: String): String = {
val rn = "\r\n"
List(
"HTTP/1.1 200 OK",
"Content-Type: text/html",
"Connection: Keep-Alive",
s"Content-Length: ${content.length()}",
rn + content
).mkString(rn)
}
private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
private val readByteBuffer = ByteBuffer.allocateDirect(1024 * 2) // 2kb
def read(
socket: AsynchronousSocketChannel
)(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
socket.read(readByteBuffer.duplicate(), socket, h)
def write(
socket: AsynchronousSocketChannel
)(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
socket.write(writeByteBuffer.duplicate(), socket, h)
}
// Generic async completion handler
case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
override def completed(result: V, attachment: A): Unit =
cb(result, attachment)
override def failed(cause: Throwable, attachment: A): Unit = {
cause match {
case e: IOException => log(e.getMessage)
case _ => cause.printStackTrace()
}
}
}
// Logging utility
def log(input: Any*): Unit = {
if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
}
private val onAccept
: Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
(socket, server) => {
log("\nACCEPT")
// Accept new connections immediately
server.accept(serverSocketChannel, onAccept)
// Read from the current socket
NIOBuffer.read(socket)(onRead)
}
)
private val onRead: Handle[Integer, AsynchronousSocketChannel] =
Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
log("READ", bytes)
// EOF, meaning connection can be closed
if (bytes == -1) socket.close()
// Some data was read and now we can respond back
else if (bytes > 0) NIOBuffer.write(socket)(onWrite)
})
private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
log("WRITE", bytes)
// Read from the socket
NIOBuffer.read(socket)(onRead)
})
def main(args: Array[String]): Unit = {
// Setup socket channel
serverSocketChannel.bind(new InetSocketAddress(PORT))
serverSocketChannel.accept(serverSocketChannel, onAccept)
// Making the main thread wait
group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
}
}