我有一个程序需要处理大量数据包。最终,我希望能够处理大约 350k pkts/s。为了解决这个问题,我迭代了几个版本的代码,最终得到了下面的代码。
作为参考,该代码在一个 ubuntu VM 上运行,该 VM 有 4 个专用内核和一个比该应用程序所需的大得多的 NIC。
在大多数情况下,运行以下代码仅加载一个核心。主要是内核调用,我猜是针对 IO 的。
Pcap(如果准确的话)仅报告程序缓冲区丢弃的数据包,而不是接口本身。
仅以 87k pkts/s 观察到丢弃的数据包。如果我想通过它获得 4-5 倍的数据加载,我必须显着改进这一点。
请参阅代码下面的问题...
[dependencies]
pcap = { version = "1.1.0", features = ["all-features", "capture-stream"] }
tokio = { version = "1.32.0", features = ["full"] }
futures = { version = "0.3.28"}
use pcap::{Active, Capture, Inactive, Error, Packet, PacketCodec, PacketStream};
use tokio::sync::mpsc;
use futures::StreamExt;
// Simple codec that returns owned copies, since the result may not
// reference the input packet.
pub struct BoxCodec;
impl PacketCodec for BoxCodec {
type Item = Box<[u8]>;
fn decode(&mut self, packet: Packet) -> Self::Item {
packet.data.into()
}
}
fn new_stream(capture_inactive: Capture<Inactive>) -> Result<PacketStream<Active, BoxCodec>, Error> {
let cap = capture_inactive
.promisc(true)
.immediate_mode(true)
.open()?
.setnonblock()?;
cap.stream(BoxCodec)
}
// generate a dummy layer 2 packet that we can easily find in wireshark
async fn generate_packet() -> Vec<u8> {
// Define source and destination MAC addresses
let src_mac = [0x00, 0x11, 0x22, 0x33, 0x44, 0x55];
let dest_mac = [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF];
// Create the Ethernet frame
let mut pkt: Vec<u8> = Vec::new();
// Destination MAC address
pkt.extend_from_slice(&dest_mac);
// Source MAC address
pkt.extend_from_slice(&src_mac);
// EtherType (0x0800 for IPv4)
pkt.extend_from_slice(&[0x08, 0x00]);
// Custom payload
let payload: [u8; 10] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A];
pkt.extend_from_slice(&payload);
pkt
}
#[tokio::main]
async fn main() {
let capture_inactive = Capture::from_device("ens192").unwrap();
let (tx, mut rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1024);
let (tx_msg, mut rx_msg): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1024);
// spawn the process for reading packets from the interface...
tokio::spawn(async move {
let mut stream = new_stream(capture_inactive).expect("Failed to create stream");
let mut count = 0;
loop {
tokio::select! {
packet = stream.next() => { // packet is Option<Result<Box>>
count += 1;
if count % 1_000_000 == 0 {
let cap = stream.capture_mut();
let stats = cap.stats().unwrap();
println!(
"Received: {}, dropped: {}, if_dropped: {}",
stats.received, stats.dropped, stats.if_dropped
);
}
if let Some(Ok(data)) = packet {
let _send_result = tx.send(data.to_vec()).await;
}
},
data = rx_msg.recv() => {
let _ = stream.capture_mut().sendpacket(data.unwrap());
}
}
}
});
let worker_handle = tokio::spawn(async move {
let mut count = 0;
loop {
match rx.recv().await {
Some(_packet) => {
count += 1;
if count % 100_000 == 0 {
println!("Processed {} packets", count);
}
if count % 100_000 == 0 {
let data = generate_packet().await;
let _msg = tx_msg.send(data).await;
}
}
None => {
println!("Worker task ended");
break;
}
}
}
});
worker_handle.await.unwrap();
}
这是显示程序输出和 htop 统计信息的屏幕截图。
我可以采取哪些措施来提高绩效?虽然我现在找不到它,但我想我读到 tokio 默认使用所有可用的核心。为了一笑,我确实在 tokio 上运行了一个带有多线程选项的版本。处理这个问题并改变可用工人的数量似乎并没有以任何积极的方式影响性能。
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
我正在尝试卸载主线程,以允许它除了摄取数据包之外不执行任何操作,但这似乎并不是这里发生的情况。
想法?
你必须放弃 Tokio,转而与 LMAX 颠覆者合作。
https://lmax-exchange.github.io/disruptor/disruptor.html
幸运的是,有一个 Rust 端口。
https://github.com/sklose/disrustor
根据记忆,如果您遵循性能说明并将每个执行器固定到专用核心,则每秒可以处理数百万个事件。
https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
希望有帮助。