PilotGuy
PilotGuy

Reputation: 65

Performance optimizations in tokio and pcap

Background

I've got a program that needs to handle a lot of packets. Ultimately, I'm looking to handle around 350k pkts/s. To tackle this, I've iterated through several versions of code and ended up with the code below.

For reference, the code is running on an ubuntu VM with 4 dedicated cores and a NIC way bigger than is needed for this application.

Observations

Running the below code only loads one core, for the most part. It's mostly kernel calls, which I'm guessing is for IO.

Pcap (if accurate) is only reporting dropped packets for the program buffer -- not the interface itself.

The dropped packets are being observed with only 87k pkts/s. I've got to significantly improve this if I want to get 4-5X data loads through it.

See the question below the code...

[dependencies]
pcap = { version = "1.1.0", features = ["all-features", "capture-stream"] }
tokio = { version = "1.32.0", features = ["full"] }
futures = { version = "0.3.28"}
use pcap::{Active, Capture, Inactive, Error, Packet, PacketCodec, PacketStream};
use tokio::sync::mpsc;
use futures::StreamExt;

// Simple codec that returns owned copies, since the result may not
// reference the input packet.
pub struct BoxCodec;

impl PacketCodec for BoxCodec {
    type Item = Box<[u8]>;

    fn decode(&mut self, packet: Packet) -> Self::Item {
        packet.data.into()
    }
}

fn new_stream(capture_inactive: Capture<Inactive>) -> Result<PacketStream<Active, BoxCodec>, Error> {
    let cap = capture_inactive
        .promisc(true)
        .immediate_mode(true)
        .open()?
        .setnonblock()?;
    cap.stream(BoxCodec)
}

// generate a dummy layer 2 packet that we can easily find in wireshark
async fn generate_packet() -> Vec<u8> {
    // Define source and destination MAC addresses
    let src_mac = [0x00, 0x11, 0x22, 0x33, 0x44, 0x55];
    let dest_mac = [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF];

    // Create the Ethernet frame
    let mut pkt: Vec<u8> = Vec::new();

    // Destination MAC address
    pkt.extend_from_slice(&dest_mac);

    // Source MAC address
    pkt.extend_from_slice(&src_mac);

    // EtherType (0x0800 for IPv4)
    pkt.extend_from_slice(&[0x08, 0x00]);

    // Custom payload
    let payload: [u8; 10] = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A];
    pkt.extend_from_slice(&payload);
    
    pkt
}

#[tokio::main]
async fn main() {
    let capture_inactive = Capture::from_device("ens192").unwrap();

    let (tx, mut rx): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1024);
    let (tx_msg, mut rx_msg): (mpsc::Sender<Vec<u8>>, mpsc::Receiver<Vec<u8>>) = mpsc::channel(1024);

    // spawn the process for reading packets from the interface...
    tokio::spawn(async move {
        let mut stream = new_stream(capture_inactive).expect("Failed to create stream");
        let mut count = 0;
        loop {
            tokio::select! {
                packet = stream.next() => { // packet is Option<Result<Box>>
                    count += 1;
                    if count % 1_000_000 == 0 {
                        let cap = stream.capture_mut();
                        let stats = cap.stats().unwrap();
                        println!(
                            "Received: {}, dropped: {}, if_dropped: {}",
                            stats.received, stats.dropped, stats.if_dropped
                        );
                    }

                    if let Some(Ok(data)) = packet {
                        let _send_result = tx.send(data.to_vec()).await;
                    }
                },
                data = rx_msg.recv() => {
                    let _ = stream.capture_mut().sendpacket(data.unwrap());
                }
            }
        }
    });

    let worker_handle = tokio::spawn(async move {
        let mut count = 0;
        loop {
            match rx.recv().await {
                Some(_packet) => {
                    count += 1;
                    if count % 100_000 == 0 {
                        println!("Processed {} packets", count);
                    }
                    if count % 100_000 == 0 {
                        let data = generate_packet().await;
                        let _msg = tx_msg.send(data).await;
                    }
                }
                None => {
                    println!("Worker task ended");
                    break;
                }
            }
        }
    });

    worker_handle.await.unwrap();

}

Here is a screenshot showing the program output, and the htop stats.

htop and program output

Question

What can I look at to improve performance? Although I can't find it at the moment, I thought I read that tokio defaults to using all of the available cores. For grins, I did run a version with the multi-thread options on tokio. Playing with this and changing the number of available workers did not seem to impact performance in any positive way.

#[tokio::main(flavor = "multi_thread", worker_threads = 8)]

I'm trying to offload the main thread to allow it to do nothing but ingest packets, but that doesn't appear to be what's happening here.

Ideas?

Upvotes: 0

Views: 880

Answers (1)

Marvin.Hansen
Marvin.Hansen

Reputation: 1638

You have to let go of Tokio and instead work with the LMAX disruptor.

https://lmax-exchange.github.io/disruptor/disruptor.html

Luckily there is a Rust port.

https://github.com/sklose/disrustor

From memory, if you follow the performance instructions and pin each executor to a dedicated core, you can process millions of events per second.

https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results

Hope that helps.

Upvotes: 0

Related Questions