非阻塞I/O和Kotlin协程有什么关系?

问题描述 投票:0回答:2

Kotlin 协程和非阻塞 I/O 之间有什么关系?其中之一是否暗示着另一个?如果我使用阻塞 I/O 会发生什么?这对性能有何影响?

kotlin kotlin-coroutines nonblocking ktor ktor-client
2个回答
5
投票

协程被设计为包含非阻塞(即CPU 限制)代码。这就是为什么默认协程调度程序 – Dispatchers.Default – 总共有

max(2, num_of_cpus)
线程来执行调度的协程。例如,默认情况下,高度并发的程序(例如在具有 2 个 CPU 的计算机中运行的 Web 服务器)的计算能力会下降 50%,同时线程会阻塞等待 I/O 在协程中完成。

但非阻塞 I/O 并不是协程的一个特性。协程只是提供了一种更简单的编程模型,其中包含 挂起函数 ,而不是 Java 中难以阅读的 CompletableFuture 延续,以及 结构化并发 等概念。


要了解协程和非阻塞 I/O 如何协同工作,这里有一个实际示例:

server.js: 一个简单的 Node.js HTTP 服务器,接收请求,然后返回响应

~5s

const { createServer } = require("http");

let reqCount = 0;
const server = createServer(async (req, res) => {
    const { method, url } = req;
    const reqNumber = ++reqCount;
    console.log(`${new Date().toISOString()} [${reqNumber}] ${method} ${url}`);
    
    await new Promise((resolve) => setTimeout(resolve, 5000));
    res.end("Hello!\n");
    console.log(`${new Date().toISOString()} [${reqNumber}] done!`);
});

server.listen(8080);
console.log("Server started!");

main.kt: 使用三种实现向 Node.js 服务器发送 128 个 HTTP 请求:

1.

withJdkClientBlocking()
:在由 Dispatchers.IO 调度的协程内调用 JDK11 java.net.http.HttpClient 的阻塞 I/O 方法。

import java.net.URI import java.net.http.HttpClient as JDK11HttpClient import java.net.http.HttpRequest as JDK11HttpRequest import java.net.http.HttpResponse as JDK11HttpResponse import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext fun withJdkClientBlocking() { println("Running with JDK11 client using blocking send()") val client = JDK11HttpClient.newHttpClient() runExample { // Sometimes you can't avoid coroutines with blocking I/O methods. // These must be always be dispatched by Dispatchers.IO. withContext(Dispatchers.IO) { // Kotlin compiler warns this is a blocking I/O method. val response = client.send( JDK11HttpRequest.newBuilder(URI("http://localhost:8080")).build(), JDK11HttpResponse.BodyHandlers.ofString() ) // Return status code. response.statusCode() } } }

2. withJdkClientNonBlocking()

:调用 JDK11 
java.net.HttpClient
 非阻塞 I/O 方法。这些方法返回一个 
CompletableFuture<T>
,其结果使用来自 
kotlinx-coroutines-jdk8CompletionStage.await() 互操作性扩展函数来使用。尽管 I/O 不会阻塞任何线程,但异步请求/响应编组/解组在 Java Executor 上运行,因此该示例使用单线程执行器来说明单个线程如何处理许多并发请求,因为非阻塞 I/O。

import java.net.URI import java.net.http.HttpClient as JDK11HttpClient import java.net.http.HttpRequest as JDK11HttpRequest import java.net.http.HttpResponse as JDK11HttpResponse import java.util.concurrent.Executors import kotlinx.coroutines.future.await fun withJdkClientNonBlocking() { println("Running with JDK11 client using non-blocking sendAsync()") val httpExecutor = Executors.newSingleThreadExecutor() val client = JDK11HttpClient.newBuilder().executor(httpExecutor).build() try { runExample { // We use `.await()` for interoperability with `CompletableFuture`. val response = client.sendAsync( JDK11HttpRequest.newBuilder(URI("http://localhost:8080")).build(), JDK11HttpResponse.BodyHandlers.ofString() ).await() // Return status code. response.statusCode() } } finally { httpExecutor.shutdown() } }

3. withKtorHttpClient()

 使用 
Ktor,一个用 Kotlin 和协程编写的非阻塞 I/O HTTP 客户端。

import io.ktor.client.engine.cio.CIO import io.ktor.client.HttpClient as KtorClient import io.ktor.client.request.get import io.ktor.client.statement.HttpResponse as KtorHttpResponse fun withKtorHttpClient() { println("Running with Ktor client") // Non-blocking I/O does not imply unlimited connections to a host. // You are still limited by the number of ephemeral ports (an other limits like file descriptors). // With no configurable thread limit, you can configure the max number of connections. // Note that HTTP/2 allows concurrent requests with a single connection. KtorClient(CIO) { engine { maxConnectionsCount = 128 } }.use { client -> runExample { // KtorClient.get() is a suspend fun, so suspension is implicit here val response = client.get<KtorHttpResponse>("http://localhost:8080") // Return status code. response.status.value } } }
把它们放在一起:

import kotlin.system.measureTimeMillis import kotlinx.coroutines.Deferred import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.runBlocking fun runExample(block: suspend () -> Int) { var successCount = 0 var failCount = 0 Executors.newSingleThreadExecutor().asCoroutineDispatcher().use { dispatcher -> measureTimeMillis { runBlocking(dispatcher) { val responses = mutableListOf<Deferred<Int>>() repeat(128) { responses += async { block() } } responses.awaitAll().forEach { if (it in 200..399) { ++successCount } else { ++failCount } } } }.also { println("Successfully sent ${success + fail} requests in ${it}ms: $successCount were successful and $failCount failed.") } } } fun main() { withJdkClientBlocking() withJdkClientNonBlocking() withKtorHttpClient() }


运行示例:

main.kt(用 # comments

 进行澄清)

# There were ~6,454ms of overhead in this execution Running with JDK11 client using blocking send() Successfully sent 128 requests in 16454ms: 128 were successful and 0 failed. # There were ~203ms of overhead in this execution Running with JDK11 client using non-blocking sendAsync() Successfully sent 128 requests in 5203ms: 128 were successful and 0 failed. # There were ~862ms of overhead in this execution Running with Ktor client Successfully sent 128 requests in 5862ms: 128 were successful and 0 failed.

server.js(用 # comments

 进行澄清)

# These are the requests from JDK11's HttpClient blocking I/O. # Notice how we only receive 64 requests at a time. # This is because Dispatchers.IO has a limit of 64 threads by default, so main.kt can't send anymore requests until those are done and the Dispatchers.IO threads are released. 2022-07-24T17:59:29.107Z [1] GET / (...) 2022-07-24T17:59:29.218Z [64] GET / 2022-07-24T17:59:34.124Z [1] done! (...) 2022-07-24T17:59:34.219Z [64] done! 2022-07-24T17:59:35.618Z [65] GET / (...) 2022-07-24T17:59:35.653Z [128] GET / 2022-07-24T17:59:40.624Z [65] done! (...) 2022-07-24T17:59:40.655Z [128] done! # These are the requests from JDK11's HttpClient non-blocking I/O. # Notice how we receive all 128 requests at once. 2022-07-24T17:59:41.163Z [129] GET / (...) 2022-07-24T17:59:41.257Z [256] GET / 2022-07-24T17:59:46.170Z [129] done! (...) 2022-07-24T17:59:46.276Z [256] done! # These are there requests from Ktor's HTTP client non-blocking I/O. # Notice how we also receive all 128 requests at once. 2022-07-24T17:59:46.869Z [257] GET / (...) 2022-07-24T17:59:46.918Z [384] GET / 2022-07-24T17:59:51.874Z [257] done! (...) 2022-07-24T17:59:51.921Z [384] done!
    

0
投票
有两个方面:

  • 如果您使用非阻塞服务器来公开您的API 更多详细信息请参见:

    https://blog.allegro.tech/2020/02/webflux-and-coroutines.html 或在这里:https://medium.com/@akarsh7791/non-blocking-i-o-with-netty-32ef20ab4b79

  • 如果您在处理这些请求的代码中使用非阻塞 http 客户端或数据库驱动程序(WebClient、apache 异步 http 客户端等)。

只做其中一件事情(例如协程+webflux+netty,但在幕后阻止数据库驱动程序)超出了目的

还有我的一个问题: 您是否尝试过将 KTOR 与其他非阻塞引擎(例如 WebClient)一起使用?当 DNS 将地址解析为多个 IP 时,我们与 CIO 发生了问题。

© www.soinside.com 2019 - 2024. All rights reserved.