如何使用mpsc通道在线程之间创建环形通信?

问题描述 投票:1回答:1

我想产生n个线程,并能够与环形拓扑中的其他线程进行通信,例如线程0可以将消息发送到线程1,将线程1发送到线程2,依此类推,将线程n发送到线程0。

这是我想通过n = 3实现的示例:

use std::sync::mpsc::{self, Receiver, Sender};
use std::thread;

let (tx0, rx0): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx1, rx1): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let (tx2, rx2): (Sender<i32>, Receiver<i32>) = mpsc::channel();

let child0 = thread::spawn(move || {
    tx0.send(0).unwrap();
    println!("thread 0 sent: 0");
    println!("thread 0 recv: {:?}", rx2.recv().unwrap());
});
let child1 = thread::spawn(move || {
    tx1.send(1).unwrap();
    println!("thread 1 sent: 1");
    println!("thread 1 recv: {:?}", rx0.recv().unwrap());
});
let child2 = thread::spawn(move || {
    tx2.send(2).unwrap();
    println!("thread 2 sent: 2");
    println!("thread 2 recv: {:?}", rx1.recv().unwrap());
});

child0.join();
child1.join();
child2.join();

这里我在循环中创建通道,将它们存储在向量中,对发送方进行重新排序,将它们存储在新的向量中,然后生成线程,每个线程都具有自己的Sender-Receiver(tx1 / rx0,tx2 / rx1等)对。 。

const NTHREADS: usize = 8;

// create n channels
let channels: Vec<(Sender<i32>, Receiver<i32>)> =
    (0..NTHREADS).into_iter().map(|_| mpsc::channel()).collect();

// switch tupel entries for the senders to create ring topology
let mut channels_ring: Vec<(Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

let mut children = Vec::new();
for i in 0..NTHREADS {
    let (tx, rx) = channels_ring.remove(i);

    let child = thread::spawn(move || {
        tx.send(i).unwrap();
        println!("thread {} sent: {}", i, i);
        println!("thread {} recv: {:?}", i, rx.recv().unwrap());
    });

    children.push(child);
}

for child in children {
    let _ = child.join();
}

这不起作用,因为无法复制发件人以创建新的载体。但是,如果使用引用( Sender):

let mut channels_ring: Vec<(&Sender<i32>, Receiver<i32>)> = (0..NTHREADS)
    .into_iter()
    .map(|i| {
        (
            &channels[if i < channels.len() - 1 { i + 1 } else { 0 }].0,
            channels[i].1,
        )
    })
    .collect();

我无法生成线程,因为无法在线程之间安全地共享std::sync::mpsc::Sender<i32>

multithreading rust channel
1个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.