tokio mpsc 频道意外关闭

问题描述 投票:0回答:1

我正在尝试使用

tokio::mpsc::channel
将数据从同步函数发送到 tokio 线程以异步处理它。

由于

tokio::mpsc::channel
是一个异步函数,因此我从同步函数生成一个运行时来创建 rx 和 tx,并在将 rx 移动到其中新生成的任务后返回 tx。

但是,并没有达到我的预期,我进行了一些调试,发现了以下情况。

  1. 通道创建后不会立即关闭。
  2. 在 rx 移至单独的任务后,通道报告自身已关闭。
  3. 就在通道报告自身已关闭的那一刻,保存移动的 rx 的生成线程似乎甚至没有启动。因此我猜不能被丢弃。

https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html。 文档说,只有当 rx 句柄被删除或显式调用

close
函数时,tx 报告本身才会关闭。在这种情况下似乎两者都不是。

 fn websocket(base: &Url, id: &str) -> Result<(Sender<String>, Stream<String>), BoxErr> {
        
        ...

        let ws_reciver_sink = Sink::new();
        let ws_receiver_stream = ws_reciver_sink.stream();

        let (ws_sender_tx, mut ws_sender_rx) = mpsc::channel(100);
        debug!("ws_sender_channel is closed 1: {}", ws_sender_tx.is_closed());

        runtime::Builder::new_current_thread()
            .enable_all()
            .build()?
            .block_on(async move {
                let (ws_stream, _res) = connect_async(url).await?;

                let (mut ws_sender_inner, mut ws_receiver_inner) = ws_stream.split();

                debug!("spawning ws_recv_task");
                let ws_recv_task = tokio::spawn(async move {
                    while let Some(Ok(Message::Text(msg))) = ws_receiver_inner.next().await {
                        ws_reciver_sink.send(msg);
                    }
                });
                debug!("spawning ws_send_task");

                let ws_send_task = tokio::spawn(async move {
                    debug!("moving ws_sender_rx handle");
                    while let Some(msg) = ws_sender_rx.recv().await {
                        if ws_sender_inner.send(Message::Text(msg)).await.is_err() {
                            ws_recv_task.abort();
                        }
                        debug!("dropping ws_sender_rx handle");
                    }
                });
                Ok::<(), BoxErr>(())
            })?;
            debug!("ws_sender_channel is closed 2: {}", ws_sender_tx.is_closed());
        Ok((ws_sender_tx, ws_receiver_stream))
    }

输出

[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] connecting to websocket ws://127.0.0.1:8000/node/test-a
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 1: false
[2022-12-13T11:58:45Z DEBUG tungstenite::handshake::client] Client handshake done.
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_recv_task
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_send_task
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 2: true
[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed: true
thread 'tests::test_connect' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("Some nights")', iot/src/lib.rs:199:50
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

我错过了什么?请赐教。

asynchronous rust rust-tokio
1个回答
0
投票

我偶然发现了完全相同的情况和几乎相同的代码:-)

 let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);

解决方案是在异步运行时实例化通道

[tokio::main] // <---------------------------
async fn main() -> Result<(), Box<dyn Error>> {
    ...
    let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);

    // async code
    comm::connect(Handle::current(), rx);

    ...
    // sync code here
    ...
}

--- 异步代码 ---

pub fn connect(runtime: Handle, mut rx: Receiver<Pos2>) {
    thread::spawn(move || {
        runtime.block_on(async {
               ...
        })
    })
© www.soinside.com 2019 - 2024. All rights reserved.