我需要创建一个由各种进程组成的分布式系统;为了简化问题,假设它们是三个:A、B 和 C。每个进程都知道其他进程的 IP 和端口,从中接收/发送 UDP 数据报。他们都使用相同的 UDP 端口(假设端口 8080)。
假设A的IP为192.168.1.1,B的IP为192.168.1.2,C的IP为192.168.1.3:
进程相互通信,拒绝来自其他进程的数据报.
我要启用的通信可以用以下元组来描述。所有行都不同,因此它们应该描述有效的设置。
源地址 | 源端口 | 目的地地址 | 目的港 | 协议 |
---|---|---|---|---|
192.168.1.1 | 8080 | 192.168.1.2 | 8080 | UDP |
192.168.1.1 | 8080 | 192.168.1.3 | 8080 | UDP |
192.168.1.2 | 8080 | 192.168.1.1 | 8080 | UDP |
192.168.1.2 | 8080 | 192.168.1.3 | 8080 | UDP |
192.168.1.3 | 8080 | 192.168.1.1 | 8080 | UDP |
192.168.1.3 | 8080 | 192.168.1.2 | 8080 | UDP |
如何在 Rust + Tokio 中实现这一点?
在 Rust 中,connecting a socket 意味着设置
send()
的默认目的地,并限制通过 recv
从指定地址读取的数据包。这个概念并不特定于 Rust:POSIX 套接字(例如 UDP 套接字)的行为方式相同.
这就是我的想法:
// A's startup code
let sockforb = UdpSocket::bind("0.0.0.0:8080").await?;
let sockforc = UdpSocket::bind("0.0.0.0:8080").await?;
let b_addr = "192.168.1.2:8080".parse::<SocketAddr>().unwrap();
let c_addr = "192.168.1.3:8080".parse::<SocketAddr>().unwrap();
sockforb.connect(b_addr).await?;
sockforc.connect(c_addr).await?;
上面的代码会很方便:我有两个 distinct 套接字变量,如果我对它们调用
send
/recv
,我会向/从所需的进程发送/接收数据报。
但是,代码会产生以下错误:
Error: Os { code: 98, kind: AddrInUse, message: "Address already in use" }
作为一种解决方法,我可以只定义一个套接字变量并将多个地址传递给连接方法(参数的类型为
ToSocketAddrs
)。这让我只能向/从指定进程发送/接收数据报。然而,虽然这个解决方案没有错误,但它并不方便,因为我有一个单一的套接字变量,与不同进程的多个变量形成对比。我的意图是拥有不同的套接字变量,以便将它们放入每个进程的不同结构中。
我如何在 Rust + Tokio 中实现这一点,可能 使用可移植(非操作系统依赖)代码?
可以在像 MacOS 和 Linux 这样的操作系统的 unix 上设置
SO_REUSEADDRESS
和SO_REUSEPORT
。我不确定它在 Windows 上是否可行,但它可能只设置 SO_REUSEADDRESS
,在下面我使用 net2
板条箱在 UdpBuilder
和 UnixUdpBuilderExt
中进行安全抽象:
use std::{net::UdpSocket, time::Duration};
use std::io;
use net2::{unix::UnixUdpBuilderExt, UdpBuilder};
#[tokio::main]
async fn main() {
let build = || -> io::Result<UdpBuilder> {
let b = UdpBuilder::new_v4()?;
b.reuse_address(true)?.reuse_port(true)?;
Ok(b)
};
let a = build().unwrap().bind("0.0.0.0:8080").unwrap();
a.connect("127.0.0.1:8081").unwrap();
let a = tokio::net::UdpSocket::from_std(a).unwrap();
let b = build().unwrap().bind("0.0.0.0:8080").unwrap();
b.connect("127.0.0.1:8082").unwrap();
let b = tokio::net::UdpSocket::from_std(b).unwrap();
let ha = tokio::task::spawn(async move {
let mut buf = [0; 1024];
while let Ok(n) = a.recv(&mut buf).await {
println!("a: {:?}", String::from_utf8_lossy(&buf[..n]));
}
});
let hb = tokio::task::spawn(async move {
let mut buf = [0; 1024];
while let Ok(n) = b.recv(&mut buf).await {
println!("b: {:?}", String::from_utf8_lossy(&buf[..n]));
}
});
let hc = tokio::task::spawn(async {
let s = UdpSocket::bind("0.0.0.0:8081").expect("connecting to a");
s.connect("127.0.0.1:8080").unwrap();
for _ in 0..10 {
s.send(b"hello a").unwrap();
tokio::time::sleep(Duration::from_millis(510)).await;
}
});
let hd = tokio::task::spawn(async {
let s = UdpSocket::bind("0.0.0.0:8082").expect("connecting to b");
s.connect("127.0.0.1:8080").unwrap();
for _ in 0..10 {
s.send(b"hello b").unwrap();
tokio::time::sleep(Duration::from_millis(510)).await;
}
});
ha.await.unwrap();
hb.await.unwrap();
hc.await.unwrap();
hd.await.unwrap();
}