将来自async_channel的查询与不同类型绑定以同步执行它们

问题描述 投票:0回答:1

我想通过同步访问我的共享内存

shared
来简化我的代码。

else::run 函数等待来自在另一个线程中运行的函数发送者的任何请求。 这些请求以不同类型

Foo
Bar
)的结构形式通过
async_channel
发送。 每个请求在共享内存上都有不同的行为。

我认为可以使用

async_channel::Receiver::recv()
将不同的调用绑定到
futures::future::select_all
,但我无法弄清楚。 绑定两个接收器允许同时等待两个接收器,并在同步的同时在第一个接收到的消息上恢复任务。

我如何设法从同一类型的不同类型的不同 async_channel 接收? (下面是最小可重现示例)

use async_channel::Sender;
use std::{thread, time};
use futures::executor::block_on;

pub struct Foo {}
pub struct Bar {}

pub mod elsewhere {
    use futures::future::select_all;
    use crate::{Foo, Bar};
    use async_channel::Receiver;
    
    enum FooBar {
        Foo(Foo),
        Bar(Bar)
    }
    
    pub async fn run(rx_foo: Receiver<Foo>, rx_bar: Receiver<Bar>) {
        println!("run is listening");
        
        let mut shared: Vec<i32> = vec![0]; // Try to avoid having a mutex on shared memory by accessing synchronously
        
        loop {
            let _recv_foo = rx_foo.recv();
            let _recv_bar = rx_bar.recv();
            
            /**
             * [...]
             * Cast Foo and Bar into a FooBar and use something like "select_all" to wait for 
             * the first received element of the two channels rx_foo & rx_bar 
             */
            
            let firstReceived = FooBar::Foo(Foo{}); // Assign a dummy value instead of the channels msg for compiling the example 
            
            match firstReceived {
                Foo => { shared.push(0);}
                Bar => { shared.clear();}
            }
        }
    }
}

async fn sender(tx_foo: Sender<Foo>, tx_bar: Sender<Bar>) {
    println!("Send to run");
    let _ = tx_foo.send(Foo{}).await;
    let _ = tx_bar.send(Bar{}).await;
}

fn main() {        
    // I want the main module to be agnostic of elsewhere::FooBar
    let (tx_foo, rx_foo) = async_channel::unbounded::<Foo>();
    let (tx_bar, rx_bar) = async_channel::unbounded::<Bar>();

    thread::spawn(move || block_on(elsewhere::run(rx_foo, rx_bar)));
    thread::spawn(move || block_on(sender(tx_foo, tx_bar)));

    thread::sleep(time::Duration::from_secs(10));
}

asynchronous rust rust-futures
1个回答
0
投票

如果我理解正确的话,你只需要

select!
而不是
select_all
。有了它,您可以像这样处理来自两个通道的接收:

select! {
    _recv_foo = rx_foo.recv() => {
        shared.push(0);
    }
    _recv_bar = rx_bar.recv() => {
        shared.clear();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.