Reputation: 65
I've got a program that needs to handle a lot of packets. Ultimately, I'm looking to handle around 350k pkts/s. To tackle this, I've iterated through several versions of code and ended up with the code below.
For reference, the code is running on an ubuntu VM with 4 dedicated cores and a NIC way bigger than is needed for this application.
Running the below code only loads one core, for the most part. It's mostly kernel calls, which I'm guessing is for IO.
Pcap (if accurate) is only reporting dropped packets for the program buffer -- not the interface itself.
The dropped packets are being observed with only 87k pkts/s. I've got to significantly improve this if I want to get 4-5X data loads through it.
See the question below the code...
[dependencies]
pcap = { version = "1.1.0", features = ["all-features", "capture-stream"] }
tokio = { version = "1.32.0", features = ["full"] }
futures = { version = "0.3.28"}
use pcap::{Active, Capture, Inactive, Error, Packet, PacketCodec, PacketStream};
use tokio::sync::mpsc;
use futures::StreamExt;
// Simple codec that returns owned copies, since the result may not
// reference the input packet.
pub struct BoxCodec;
impl PacketCodec for BoxCodec {
type Item = Box<[u8]>;
fn decode(&mut self, packet: Packet) -> Self::Item {
packet.data.into()
}
}
fn new_stream(capture_inactive: Capture<Inactive>) -> Result<PacketStream<Active, BoxCodec>, Error> {
let cap = capture_inactive
.promisc(true)
.immediate_mode(true)
.open()?
.setnonblock()?;
cap.stream(BoxCodec)
}
// generate a dummy layer 2 packet that we can easily find in wireshark
async fn generate_packet() -> Vec<u8> {
// Define source and destination MAC addresses
let src_mac = [0x00, 0x11, 0x22, 0x33, 0x44, 0x55];
let dest_mac = [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF];
// Create the Ethernet frame
let mut pkt: Vec<u8> = Vec::new();
// Destination MAC address
pkt.extend_from_slice(&dest_mac);
// Source MAC address
pkt.extend_from_slice(&src_mac);
// EtherType (0x0800 for IPv4)
pkt.extend_from_slice(&[0x08, 0x00]);
// Custom payload
let payload: [u8; 10] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A];
pkt.extend_from_slice(&payload);
pkt
}
#[tokio::main]
async fn main() {
let capture_inactive = Capture::from_device("ens192").unwrap();
let (tx, mut rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1024);
let (tx_msg, mut rx_msg): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1024);
// spawn the process for reading packets from the interface...
tokio::spawn(async move {
let mut stream = new_stream(capture_inactive).expect("Failed to create stream");
let mut count = 0;
loop {
tokio::select! {
packet = stream.next() => { // packet is Option<Result<Box>>
count += 1;
if count % 1_000_000 == 0 {
let cap = stream.capture_mut();
let stats = cap.stats().unwrap();
println!(
"Received: {}, dropped: {}, if_dropped: {}",
stats.received, stats.dropped, stats.if_dropped
);
}
if let Some(Ok(data)) = packet {
let _send_result = tx.send(data.to_vec()).await;
}
},
data = rx_msg.recv() => {
let _ = stream.capture_mut().sendpacket(data.unwrap());
}
}
}
});
let worker_handle = tokio::spawn(async move {
let mut count = 0;
loop {
match rx.recv().await {
Some(_packet) => {
count += 1;
if count % 100_000 == 0 {
println!("Processed {} packets", count);
}
if count % 100_000 == 0 {
let data = generate_packet().await;
let _msg = tx_msg.send(data).await;
}
}
None => {
println!("Worker task ended");
break;
}
}
}
});
worker_handle.await.unwrap();
}
Here is a screenshot showing the program output, and the htop stats.
What can I look at to improve performance? Although I can't find it at the moment, I thought I read that tokio defaults to using all of the available cores. For grins, I did run a version with the multi-thread options on tokio. Playing with this and changing the number of available workers did not seem to impact performance in any positive way.
#[tokio::main(flavor = "multi_thread", worker_threads = 8)]
I'm trying to offload the main thread to allow it to do nothing but ingest packets, but that doesn't appear to be what's happening here.
Ideas?
Upvotes: 0
Views: 880
Reputation: 1638
You have to let go of Tokio and instead work with the LMAX disruptor.
https://lmax-exchange.github.io/disruptor/disruptor.html
Luckily there is a Rust port.
https://github.com/sklose/disrustor
From memory, if you follow the performance instructions and pin each executor to a dedicated core, you can process millions of events per second.
https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
Hope that helps.
Upvotes: 0