为什么下面的Java NIO API与netty相比会这么慢?

问题描述 投票:0回答:1

我有下面的JAVA实现来使用NIO API创建一个简单的Web服务器。

package zion

import java.net._
import java.nio.ByteBuffer
import java.nio.channels._

object NHello {

  import java.nio.CharBuffer
  import java.nio.charset.Charset

  def helloWorldBytes: ByteBuffer = Charset
    .forName("ISO-8859-1")
    .newEncoder
    .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

  def httpResponse(content: String): String = {
    val rn = "\r\n"
    List(
      "HTTP/1.1 200 OK",
      "Content-Type: text/html",
      "Connection: Keep-Alive",
      s"Content-Length: ${content.length()}",
      rn + content
    ).mkString(rn)
  }

  def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    // Server Socket Channel
    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    // Selector
    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          channel.write(helloWorldBytes)
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

使用 wrk 我的请求量大约是每秒几千次。

wrk -t12 -c100 -d10s http://127.0.0.1:8080

与Netty相比,这似乎有点太慢了。而使用Netty,我的吞吐量至少能提高10~15倍。考虑到Netty也是建立在NIO之上,我到底做错了什么?

是否有一些明显的性能优化是我遗漏的?

java scala performance nio
1个回答
0
投票

经过进一步的搜索和分析,我终于弄清楚了上面这段代码中的所有问题。

def main(args: Array[String]): Unit = {
    val port    = 8080
    val address = new InetSocketAddress(port)

    val serverSocketChannel = ServerSocketChannel.open()
    serverSocketChannel.bind(address)
    serverSocketChannel.configureBlocking(false)

    val selector = Selector.open()
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)

    while (true) {
      selector.select()
      val iterator = selector.selectedKeys().iterator()
      while (iterator.hasNext) {
        val key = iterator.next()
        if (key.isAcceptable) {
          val channel = serverSocketChannel.accept()
          // 1. Blocking Write
          channel.write(helloWorldBytes)
          // 2. Blocking Close
          channel.close()
        }

      }
      iterator.remove()
    }

    sys.addShutdownHook({
      println("Shutting down...")
      serverSocketChannel.close()
    })

    println("Exiting...")
  }
}

主要的问题是

1. 阻止写由于阻塞写调用,除非字节写入流中,否则我无法接受更多的连接。所以这些连接就这样闲置着,从而影响了webserver的性能。

2. 阻止接近close 呼叫也是阻塞的,需要时间来完成。同样除非关闭连接,否则不接受新的请求,也不响应已接受的连接。

关闭连接还有一个问题。创建一个新的连接是很昂贵的 wrk 等,不要在发出一个请求后自动杀死连接。在每次请求后关闭服务器上的连接也会成为性能杀手,从而影响你的基准。

这里有一个另类的 "高性能 "实现方式

package zion

import java.io.IOException
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{
  AsynchronousChannelGroup,
  AsynchronousServerSocketChannel,
  AsynchronousSocketChannel,
  CompletionHandler
}
import java.util.concurrent.{Executors, TimeUnit}

/**
  * This is potentially as fast as it can get using NIO APIs.
  */
object HelloAsyncNIO {
  // Create a thread pool for the socket channel
  // It would be better to have probably only one thread for events.
  // That pool could be shared betwee the SocketServer and in future SocketClients.
  private val group =
    AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(24))

  // Socket to accept connections
  private val serverSocketChannel = AsynchronousServerSocketChannel.open(group)

  // Port to be used to connect
  private val PORT = 8081

  // Flag to handle logging
  private val ENABLE_LOGGING = false

  /**
    * Contains utilities to manage read/write on the socket channels
    */
  object NIOBuffer {
    def helloWorldBytes: ByteBuffer = Charset
      .forName("ISO-8859-1")
      .newEncoder
      .encode(CharBuffer.wrap(httpResponse("NHello World\n")))

    def httpResponse(content: String): String = {
      val rn = "\r\n"
      List(
        "HTTP/1.1 200 OK",
        "Content-Type: text/html",
        "Connection: Keep-Alive",
        s"Content-Length: ${content.length()}",
        rn + content
      ).mkString(rn)
    }
    private val writeByteBuffer = ByteBuffer.wrap(helloWorldBytes)
    private val readByteBuffer  = ByteBuffer.allocateDirect(1024 * 2) // 2kb
    def read(
        socket: AsynchronousSocketChannel
    )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
      socket.read(readByteBuffer.duplicate(), socket, h)
    def write(
        socket: AsynchronousSocketChannel
    )(h: CompletionHandler[Integer, AsynchronousSocketChannel]): Unit =
      socket.write(writeByteBuffer.duplicate(), socket, h)
  }

  // Generic async completion handler
  case class Handle[V, A](cb: (V, A) => Unit) extends CompletionHandler[V, A] {
    override def completed(result: V, attachment: A): Unit =
      cb(result, attachment)
    override def failed(cause: Throwable, attachment: A): Unit = {
      cause match {
        case e: IOException => log(e.getMessage)
        case _              => cause.printStackTrace()
      }
    }
  }

  // Logging utility
  def log(input: Any*): Unit = {
    if (ENABLE_LOGGING) println(input.map(_.toString).mkString(", "))
  }

  private val onAccept
      : Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel] =
    Handle[AsynchronousSocketChannel, AsynchronousServerSocketChannel](
      (socket, server) => {
        log("\nACCEPT")

        // Accept new connections immediately
        server.accept(serverSocketChannel, onAccept)

        // Read from the current socket
        NIOBuffer.read(socket)(onRead)
      }
    )

  private val onRead: Handle[Integer, AsynchronousSocketChannel] =
    Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
      log("READ", bytes)

      // EOF, meaning connection can be closed
      if (bytes == -1) socket.close()

      // Some data was read and now we can respond back
      else if (bytes > 0) NIOBuffer.write(socket)(onWrite)

    })

  private val onWrite: Handle[Integer, AsynchronousSocketChannel] =
    Handle[Integer, AsynchronousSocketChannel]((bytes, socket) => {
      log("WRITE", bytes)

      // Read from the socket
      NIOBuffer.read(socket)(onRead)
    })

  def main(args: Array[String]): Unit = {

    // Setup socket channel
    serverSocketChannel.bind(new InetSocketAddress(PORT))
    serverSocketChannel.accept(serverSocketChannel, onAccept)

    // Making the main thread wait
    group.awaitTermination(Long.MaxValue, TimeUnit.SECONDS)
  }
}

© www.soinside.com 2019 - 2024. All rights reserved.