聚合来自多个不同类型的async_channel的消息以同步执行

问题描述 投票:0回答:2

我想通过同步访问我的共享内存

shared
来简化我的代码,该内存根据异步到达的请求|事件进行修改。

else::run 函数等待来自在另一个线程中运行的函数发送者的任何请求。 这些请求以不同类型

Foo
Bar
)的结构形式通过
async_channel
发送。 每个请求在共享内存上都有不同的行为。

我认为可以使用

async_channel::Receiver::recv()
将不同的调用绑定到
futures::future::select_all
,但我无法弄清楚。 绑定两个接收器允许同时等待两个接收器,并在同步的同时在第一个接收到的消息上恢复任务。

如何同时从不同类型的不同 async_channel 接收数据? (下面是最小可重现示例)

use async_channel::Sender;
use std::{thread, time};
use futures::executor::block_on;

pub struct Foo {}
pub struct Bar {}

pub mod elsewhere {
    use futures::future::select_all;
    use crate::{Foo, Bar};
    use async_channel::Receiver;
    
    enum FooBar {
        Foo(Foo),
        Bar(Bar)
    }
    
    pub async fn run(rx_foo: Receiver<Foo>, rx_bar: Receiver<Bar>) {
        println!("run is listening");
        
        let mut shared: Vec<i32> = vec![0]; // Try to avoid having a mutex on shared memory by accessing synchronously
        
        loop {
            let _recv_foo = rx_foo.recv();
            let _recv_bar = rx_bar.recv();
            
            /**
             * [...]
             * Cast Foo and Bar into a FooBar and use something like "select_all" to wait for 
             * the first received element of the two channels rx_foo & rx_bar 
             */
            
            let firstReceived = FooBar::Foo(Foo{}); // Assign a dummy value instead of the channels msg for compiling the example 
            
            match firstReceived {
                Foo => { shared.push(0);}
                Bar => { shared.clear();}
            }
        }
    }
}

async fn sender(tx_foo: Sender<Foo>, tx_bar: Sender<Bar>) {
    println!("Send to run");
    let _ = tx_foo.send(Foo{}).await;
    let _ = tx_bar.send(Bar{}).await;
}

fn main() {        
    // I want the main module to be agnostic of elsewhere::FooBar
    let (tx_foo, rx_foo) = async_channel::unbounded::<Foo>();
    let (tx_bar, rx_bar) = async_channel::unbounded::<Bar>();

    thread::spawn(move || block_on(elsewhere::run(rx_foo, rx_bar)));
    thread::spawn(move || block_on(sender(tx_foo, tx_bar)));

    thread::sleep(time::Duration::from_secs(10));
}

asynchronous rust rust-futures
2个回答
0
投票

如果我理解正确的话,你只需要

select!
而不是
select_all
。有了它,您可以像这样处理来自两个通道的接收:

select! {
    _recv_foo = rx_foo.recv() => {
        shared.push(0);
    }
    _recv_bar = rx_bar.recv() => {
        shared.clear();
    }
}

0
投票

我在

futures_lite::future::race
周围找到了解决方案。然而,它并不那么惯用。也许有人有更好的方法提供......

    async fn get_foo(rx_foo: Receiver<Foo>) -> Result<FooBar, async_channel::RecvError> {
        let foo = rx_foo.recv().await?;
        Ok(FooBar::Foo(foo))
    }
    async fn get_bar(rx_bar: Receiver<Bar>) -> Result<FooBar, async_channel::RecvError> {
        let bar = rx_bar.recv().await?;
        Ok(FooBar::Bar(bar))
    }

    pub async fn run(rx_foo: Receiver<Foo>, rx_bar: Receiver<Bar>) {
        println!("run is listening");
        let mut shared: Vec<i32> = vec![0]; // Try to avoid having a mutex on shared memory by accessing synchronously

        loop {
            let _recv_foo = get_foo(rx_foo.clone());
            let _recv_bar = get_bar(rx_bar.clone());

            let first_received = race(_recv_foo, _recv_bar).await;

            match first_received {
                Ok(FooBar::Foo(_foo)) => { shared.push(0); }
                Ok(FooBar::Bar(_bar)) => { shared.clear(); }
                Err(_e) => { println!("Houston we got a problem!"); },
            }
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.