使用异步 Rust 改变向量的元素

问题描述 投票:0回答:1

假设我有一个基本结构:

pub struct Course {
    pub id : i32
}

我想修改这个结构体的内部元素。假设之前我已从数据库中读取内容并将课程读入向量中。

我怎样才能实现这个目标?我有很多这种类型的向量,例如部门、建筑物、部分等...我还用它们进行了很多 IO 绑定操作。例如,插入到数据库,或从文件中读取或在对它们进行操作时改变它们。

到目前为止,在我的项目中,我从未使用过 Arc 或 Mutex 或任何与未来相关的东西。是的,几乎我所有的函数都是异步等待的,但它们是阻塞的并且没有生成。 (我意识到在项目开始时不考虑异步是错误的,但我是 Rust 新手)

至少我想做的是:

pub async fn async_test(courses: Arc<Mutex<Vec<Course>>>) {
    let mut handles = vec![];

    let mut courses = courses.clone().lock().await.clone();
    for course in courses.iter_mut() {
        let handle = task::spawn(async move {
            println!("SPAWNED TASK FOR COURSE: {}", course.id);
            do_something(course);
        });
        handles.push(handle);
    }
    join_all(handles).await;
}

pub async fn do_something(course: &mut Course) {
    course.id = 12345;
    for i in 1..10 {
        println!("{}", i);
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
}

我收到错误:

error[E0597]: `courses` does not live long enough
  --> src/lib.rs:31:19
   |
30 |     let mut courses = courses.clone().lock().await.clone();
   |         ----------- binding `courses` declared here
31 |     for course in courses.iter_mut() {
   |                   ^^^^^^^^^^^^^^^^^^
   |                   |
   |                   borrowed value does not live long enough
   |                   argument requires that `courses` is borrowed for `'static`
...
39 | }
   | - `courses` dropped here while still borrowed

我只是错过了一些关于 Rust 异步 future 的东西,因为编译器警告似乎没有帮助,而且我在网上找不到任何帮助。

谢谢!

asynchronous rust concurrency future rust-tokio
1个回答
0
投票

按照您现在所写的方式,这并不容易解决。

tokio::task::spawn
必须是
'static
,这意味着它不能依赖于任何东西。这意味着,您不能将引用传递给它。原因是它可能在不同的线程上运行,并且这里应用与普通 Rust 线程相同的所有权规则。

最后的

join_all
实际上并没有任何意义 -
JoinHandle
返回的
spawn
实际上并不附加到任何东西。如果
async_test
任务被取消,这实际上并不会取消生成的任务,而是使引用悬空。这就是编译器所阻止的,也是为什么它不允许引用衍生任务的原因。

可悲的是,对于

Mutex
es 来说也是如此 - 在整个
Mutex
周围有一个
Vec
不足以将单个元素生成到任务中。但即使将每个元素包装在自己的
Mutex
中也是不够的 - 这里的中心问题是确实没有办法将引用传递到任务中,使得整个问题在当前布局中不可能实现。

使用普通线程,解决此问题的简单方法是

std::thread::scope
。遗憾的是,类似
async
的内容在
std
库中不存在。

tokio
中有些板条箱可以实现相同的效果,例如
async_scoped
。这就是带有
async_scoped
的解决方案的样子:

use std::sync::Arc;

use async_scoped::TokioScope;
use tokio::sync::Mutex;

pub struct Course {
    pub id: i32,
}

pub async fn async_test(courses: Arc<Mutex<Vec<Course>>>) {
    let mut courses = courses.lock().await;

    // Be aware that `scope_and_collect` is **NOT** cancellation safe.
    unsafe {
        TokioScope::scope_and_collect(|s| {
            for course in courses.iter_mut() {
                s.spawn(async move {
                    println!("SPAWNED TASK FOR COURSE: {}", course.id);
                    do_something(course).await;
                });
            }
        })
        .await
    };
}

pub async fn do_something(course: &mut Course) {
    course.id = 12345;
    for i in 1..10 {
        println!("{}", i);
        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    }
}

请注意,(一般情况下)没有取消安全的方法来实现这样的范围;这就是为什么这个函数需要一个

unsafe
围绕它。

实现这一目标的更安全方法需要在没有参考的情况下进行架构返工;通过将所有权传递给生成的任务,或者使用许多

Arc

© www.soinside.com 2019 - 2024. All rights reserved.