将两个UDP套接字绑定到同一个地址,并连接到不同的地址

问题描述 投票:0回答:1

我需要创建一个由各种进程组成的分布式系统;为了简化问题,假设它们是三个:A、B 和 C。每个进程都知道其他进程的 IP 和端口,从中接收/发送 UDP 数据报。他们都使用相同的 UDP 端口(假设端口 8080)。

假设A的IP为192.168.1.1,B的IP为192.168.1.2,C的IP为192.168.1.3:

  • A 有两个 UDP 套接字,都绑定到 0.0.0.0:8080。第一个连接到 192.168.1.2:8080,第二个连接到 192.168.1.3:8080。它使用第一个套接字从/向 B 接收和发送数据报,第二个套接字从/向 C 接收和发送数据报。
  • B 有两个 UDP 套接字,都绑定到 0.0.0.0:8080。第一个连接到 192.168.1.1:8080,第二个连接到 192.168.1.3:8080。它使用第一个套接字从/向 A 接收和发送数据报,第二个套接字从/向 C 接收和发送数据报。
  • C 有两个 UDP 套接字,都绑定到 0.0.0.0:8080。第一个连接到 192.168.1.1:8080,第二个连接到 192.168.1.2:8080。它使用第一个套接字从/向 A 接收和发送数据报,第二个套接字从/向 B 接收和发送数据报。

进程相互通信,拒绝来自其他进程的数据报.

我要启用的通信可以用以下元组来描述。所有行都不同,因此它们应该描述有效的设置。

源地址 源端口 目的地地址 目的港 协议
192.168.1.1 8080 192.168.1.2 8080 UDP
192.168.1.1 8080 192.168.1.3 8080 UDP
192.168.1.2 8080 192.168.1.1 8080 UDP
192.168.1.2 8080 192.168.1.3 8080 UDP
192.168.1.3 8080 192.168.1.1 8080 UDP
192.168.1.3 8080 192.168.1.2 8080 UDP

如何在 Rust + Tokio 中实现这一点?

在 Rust 中,connecting a socket 意味着设置

send()
的默认目的地,并限制通过
recv
从指定地址读取的数据包。这个概念并不特定于 Rust:POSIX 套接字(例如 UDP 套接字)的行为方式相同.

这就是我的想法:

// A's startup code

let sockforb = UdpSocket::bind("0.0.0.0:8080").await?;
let sockforc = UdpSocket::bind("0.0.0.0:8080").await?;

let b_addr = "192.168.1.2:8080".parse::<SocketAddr>().unwrap();
let c_addr = "192.168.1.3:8080".parse::<SocketAddr>().unwrap();

sockforb.connect(b_addr).await?;
sockforc.connect(c_addr).await?;

上面的代码会很方便:我有两个 distinct 套接字变量,如果我对它们调用

send
/
recv
,我会向/从所需的进程发送/接收数据报。

但是,代码会产生以下错误:

Error: Os { code: 98, kind: AddrInUse, message: "Address already in use" }

作为一种解决方法,我可以只定义一个套接字变量并将多个地址传递给连接方法(参数的类型为

ToSocketAddrs
)。这让我只能向/从指定进程发送/接收数据报。然而,虽然这个解决方案没有错误,但它并不方便,因为我有一个单一的套接字变量,与不同进程的多个变量形成对比。我的意图是拥有不同的套接字变量,以便将它们放入每个进程的不同结构中。

我如何在 Rust + Tokio 中实现这一点,可能 使用可移植(非操作系统依赖)代码?

sockets rust udp rust-tokio
1个回答
0
投票

可以在像 MacOS 和 Linux 这样的操作系统的 unix 上设置

SO_REUSEADDRESS
SO_REUSEPORT
。我不确定它在 Windows 上是否可行,但它可能只设置
SO_REUSEADDRESS
,在下面我使用
net2
板条箱在
UdpBuilder
UnixUdpBuilderExt
中进行安全抽象:

use std::{net::UdpSocket, time::Duration};

use std::io;

use net2::{unix::UnixUdpBuilderExt, UdpBuilder};

#[tokio::main]
async fn main() {
    let build = || -> io::Result<UdpBuilder> {
        let b = UdpBuilder::new_v4()?;
        b.reuse_address(true)?.reuse_port(true)?;
        Ok(b)
    };

    let a = build().unwrap().bind("0.0.0.0:8080").unwrap();
    a.connect("127.0.0.1:8081").unwrap();
    let a = tokio::net::UdpSocket::from_std(a).unwrap();

    let b = build().unwrap().bind("0.0.0.0:8080").unwrap();
    b.connect("127.0.0.1:8082").unwrap();
    let b = tokio::net::UdpSocket::from_std(b).unwrap();

    let ha = tokio::task::spawn(async move {
        let mut buf = [0; 1024];
        while let Ok(n) = a.recv(&mut buf).await {
            println!("a: {:?}", String::from_utf8_lossy(&buf[..n]));
        }
    });

    let hb = tokio::task::spawn(async move {
        let mut buf = [0; 1024];
        while let Ok(n) = b.recv(&mut buf).await {
            println!("b: {:?}", String::from_utf8_lossy(&buf[..n]));
        }
    });

    let hc = tokio::task::spawn(async {
        let s = UdpSocket::bind("0.0.0.0:8081").expect("connecting to a");
        s.connect("127.0.0.1:8080").unwrap();
        for _ in 0..10 {
            s.send(b"hello a").unwrap();
            tokio::time::sleep(Duration::from_millis(510)).await;
        }
    });

    let hd = tokio::task::spawn(async {
        let s = UdpSocket::bind("0.0.0.0:8082").expect("connecting to b");
        s.connect("127.0.0.1:8080").unwrap();
        for _ in 0..10 {
            s.send(b"hello b").unwrap();
            tokio::time::sleep(Duration::from_millis(510)).await;
        }
    });
    ha.await.unwrap();
    hb.await.unwrap();
    hc.await.unwrap();
    hd.await.unwrap();
}
© www.soinside.com 2019 - 2024. All rights reserved.