是否可以在单独的线程中启动此工作程序而不是阻塞主线程?

问题描述 投票:0回答:1

我有一个 Rust 应用程序,它基本上是一个带有一些路由的 axum Web 服务器。

但是现在我需要定期检查数据库表中是否有一些新行。

如果有的话,我需要进行一些可能持续几分钟的繁重计算(特别是在尺寸较小的 Docker 容器中)。

我使用的代码如下,输出为:

The main thread should not be blocked and should print every second: Instant { t: 6479.5889821s }
The main thread should not be blocked and should print every second: Instant { t: 6480.5996495s }
The main thread should not be blocked and should print every second: Instant { t: 6481.6152853s }
Starting an heavy CPU computation...
The main thread should not be blocked and should print every second: Instant { t: 6502.5748215s }
The main thread should not be blocked and should print every second: Instant { t: 6503.5917731s }
The main thread should not be blocked and should print every second: Instant { t: 6504.5990575s }

如您所见,

Starting an heavy CPU computation...
阻塞了主线程。

有办法避免这种情况吗?

我应该对每项繁重的工作使用

tokio::task::spawn_blocking()
吗?

我可以在单独的线程中启动整个“worker”吗?因为我有很多不同的工作。

我的意思是

main()
中的这段代码:

let worker = Queue::new();

tokio::spawn(async move { worker.run().await }); // Is there a way to launch this in a separate thread?

代码在这里:Rust Playground

或者在这里:Rust Explorer

或者在这里:

use futures::StreamExt;
use rand::Rng;
use std::time::{Duration, Instant};

const CONCURRENCY: usize = 5;

struct Job {
    id: u16,
}

struct Queue {
    // some needed fields like DB connection
}

impl Queue {
    fn new() -> Self {
        Self {}
    }

    async fn run(&self) {
        loop {
            // I'll get jobs from DB here; for this demo are random generated

            let mut jobs: Vec<Job> = Vec::new();

            for _ in 0..2 {
                jobs.push(Job {
                    id: get_random_id(),
                })
            }

            futures::stream::iter(jobs)
                .for_each_concurrent(CONCURRENCY, |job| async {
                    match self.handle_job(job).await {
                        Ok(_) => {
                            // I will remove the job from queue
                        }
                        Err(_) => {
                            // I will handle this error
                        }
                    };
                })
                .await;

            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }

    async fn handle_job(&self, job: Job) -> Result<(), String> {
        if job.id % 2 == 0 {
            println!("Starting an heavy CPU computation...");

            // I'm simulating heavy CPU computations with this sleep thread blocking here
            std::thread::sleep(Duration::from_secs(10));

            // I think I can use spawn_blocking instead, right?

            // tokio::task::spawn_blocking(move || {
            //     std::thread::sleep(Duration::from_secs(8));
            // }).await.unwrap()
        }
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let worker = Queue::new();

    // Can I start the below worker.run() in a separate thread?

    tokio::spawn(async move { worker.run().await });

    loop {
        println!(
            "The main thread should not be blocked and should print every second: {:?}",
            Instant::now()
        );
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

fn get_random_id() -> u16 {
    let mut rng = rand::thread_rng();

    rng.gen::<u16>()
}
multithreading rust blocking rust-tokio spawn
1个回答
0
投票

是的,您应该使用

spawn_blocking
。这将确保 main 中的异步循环不会被 CPU 密集型代码阻塞。

Tokio 有许多专门用于此目的的阻塞线程,虽然如果您无限制地运行 CPU 繁重的任务可能会出现问题,但您正在使用

for_each_concurrent
对并行性施加自己的限制,因此这不是问题。

© www.soinside.com 2019 - 2024. All rights reserved.