我正在尝试使用
tokio::mpsc::channel
将数据从同步函数发送到 tokio 线程以异步处理它。
由于
tokio::mpsc::channel
是一个异步函数,因此我从同步函数生成一个运行时来创建 rx 和 tx,并在将 rx 移动到其中新生成的任务后返回 tx。
但是,并没有达到我的预期,我进行了一些调试,发现了以下情况。
https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html。 文档说,只有当 rx 句柄被删除或显式调用
close
函数时,tx 报告本身才会关闭。在这种情况下似乎两者都不是。
fn websocket(base: &Url, id: &str) -> Result<(Sender<String>, Stream<String>), BoxErr> {
...
let ws_reciver_sink = Sink::new();
let ws_receiver_stream = ws_reciver_sink.stream();
let (ws_sender_tx, mut ws_sender_rx) = mpsc::channel(100);
debug!("ws_sender_channel is closed 1: {}", ws_sender_tx.is_closed());
runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(async move {
let (ws_stream, _res) = connect_async(url).await?;
let (mut ws_sender_inner, mut ws_receiver_inner) = ws_stream.split();
debug!("spawning ws_recv_task");
let ws_recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(msg))) = ws_receiver_inner.next().await {
ws_reciver_sink.send(msg);
}
});
debug!("spawning ws_send_task");
let ws_send_task = tokio::spawn(async move {
debug!("moving ws_sender_rx handle");
while let Some(msg) = ws_sender_rx.recv().await {
if ws_sender_inner.send(Message::Text(msg)).await.is_err() {
ws_recv_task.abort();
}
debug!("dropping ws_sender_rx handle");
}
});
Ok::<(), BoxErr>(())
})?;
debug!("ws_sender_channel is closed 2: {}", ws_sender_tx.is_closed());
Ok((ws_sender_tx, ws_receiver_stream))
}
输出
[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] connecting to websocket ws://127.0.0.1:8000/node/test-a
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 1: false
[2022-12-13T11:58:45Z DEBUG tungstenite::handshake::client] Client handshake done.
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_recv_task
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_send_task
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 2: true
[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed: true
thread 'tests::test_connect' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("Some nights")', iot/src/lib.rs:199:50
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
我错过了什么?请赐教。
我偶然发现了完全相同的情况和几乎相同的代码:-)
let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);
解决方案是在异步运行时实例化通道
[tokio::main] // <---------------------------
async fn main() -> Result<(), Box<dyn Error>> {
...
let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);
// async code
comm::connect(Handle::current(), rx);
...
// sync code here
...
}
--- 异步代码 ---
pub fn connect(runtime: Handle, mut rx: Receiver<Pos2>) {
thread::spawn(move || {
runtime.block_on(async {
...
})
})