并行运行具有不同返回类型的异步函数

问题描述 投票:0回答:1

我在

process
函数内调用了 2 个不同的异步函数。

use actix_web::Result;

pub struct A {
    // Some fields below.
}

pub struct B {
    // Some fields below.
}

pub async fn process() -> Result<ApiResult> { 
    let connection_string = env::var("DATABASE_URL").expect("env DATABASE_URL is not set");
    let pool: sqlx::Pool<sqlx::Postgres> = PgPool::connect(&connection_string)
        .await
        .expect("Problem connecting with the database");
    let a = func_a(&pool).await?;
    let b = func_b(&pool).await?;

    // Does more stuff below using `a`, and `b`. 
}

pub async fn func_a(pool: &PgPool) -> Result<A> {
    // Does some stuff below.
}

pub async fn func_b(pool: &PgPool) -> Result<B> {
    // Does some stuff below.
}

如何同时并行运行

func_a
func_b


在浏览不同的帖子和有些类似的问题时,我尝试了

tokio::task::JoinSet
示例,如下所示,但到目前为止没有运气。

pub trait CommonTrait {}

pub struct A {
    // Some fields below.
}

pub struct B {
    // Some fields below.
}

impl CommonTrait for A {}
impl CommonTrait for B {}

pub async fn process() -> Result<ApiResult> { 
    let connection_string = env::var("DATABASE_URL").expect("env DATABASE_URL is not set");
    let pool: sqlx::Pool<sqlx::Postgres> = PgPool::connect(&connection_string)
        .await
        .expect("Problem connecting with the database");
    let mut tasks = JoinSet::new();

    tasks.spawn(async { Box::new(func_a(&pool)) });
    tasks.spawn(async { Box::new(func_b(&pool)) });

    // Does more stuff below using `a`, and `b`. 
}

但是 Rust 编译会抛出:

expected `{async block@src/service.rs:34:17: 34:69}` to be a future that resolves to `Box<impl Future<Output = Result<A, Error>>>`, 
but it resolves to `Box<impl Future<Output = Result<B, Error>>>`
expected struct `tokio::task::JoinHandle<Box<impl std::future::Future<Output = Result<A, actix_web::Error>>>>`
   found struct `tokio::task::JoinHandle<Box<impl std::future::Future<Output = Result<B, actix_web::Error>>>>`
distinct uses of `impl Trait` result in different opaque types

我是 Rust 新手,对任何想法持开放态度,包括修复我现有的实现或实现完全不同的解决方案。提前致谢! 🙏

rust parallel-processing rust-tokio actix-web rust-actix
1个回答
0
投票

尝试完成你的

JoinSet
示例来检索
a
b
的值,问题应该会变得更清晰:
JoinSet
的任务的所有结果都是通过same方法检索的
 JoinSet::join_next
,所以它们必须具有相同的类型。

要同时并行运行这两个函数,您可以使用

tokio::task::spawn
,或者如果您的函数进行阻塞调用,则可以使用
tokio::task::spawn_blocking
。这看起来像这样:

let task_a = tokio::task::spawn(func_a());
let task_b = tokio::task::spawn(func_b());
// at this point, both tasks are running in parallel
let a = task_a.await?;
let b = task_b.await?;
// at this point, both tasks have produced their result
process_both(a, b);

如果相反,

a
b
需要独立处理, 你不想等待一个来处理另一个, 那么你应该将处理包含在任务中:

let task_a = tokio::task::spawn(async {
    let a = func_a().await;
    process_a(a).await
});
let task_b = tokio::task::spawn(async {
    let b = func_b().await;
    process_b(b).await
});
let a_processed = task_a.await?;
let b_processed = task_b.await?;
© www.soinside.com 2019 - 2024. All rights reserved.