Cthulhu
Cthulhu

Reputation: 1372

Performance and memory problems in rust async program

I wanted to benchmark requests from rust to particular service using async client, and created async benchmarker for that.

This function should run specified amount of concurrent threads (actually, parallel chains of futures) for specified duration and report count of iterations achieved.

use futures::future;
use futures::prelude::*;
use std::error::Error;
use std::time::{Duration, Instant};
use std::{cell, io, rc};
use tokio::runtime::current_thread::Runtime;
use tokio::timer;

struct Config {
    workers: u32,
    duration: Duration,
}

/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
    f: F,
) -> Box<dyn Future<Item = (), Error = P::Error> + 'a> {
    Box::new(f().and_then(move |_| cycle(f)))
}

fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Error = io::Error> + 'a>(
    config: Config,
    f: F,
) -> impl Future<Item = u32, Error = io::Error> + 'a {
    let counter = rc::Rc::new(cell::Cell::new(0u32));
    let f = rc::Rc::new(f);
    future::select_all((0..config.workers).map({
        let counter = rc::Rc::clone(&counter);
        move |_| {
            let counter = rc::Rc::clone(&counter);
            let f = rc::Rc::clone(&f);
            cycle(move || {
                let counter = rc::Rc::clone(&counter);
                f().map(move |_| {
                    counter.set(counter.get() + 1);
                })
            })
        }
    }))
    .map(|((), _, _)| ())
    .map_err(|(err, _, _)| err)
    .select(
        timer::Delay::new(Instant::now() + config.duration)
            .map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())),
    )
    .map(move |((), _)| counter.get())
    .map_err(|(err, _)| err)
}

fn main() {
    let duration = std::env::args()
        .skip(1)
        .next()
        .expect("Please provide duration in seconds")
        .parse()
        .expect("Duration must be integer number");

    let ms = Duration::from_millis(1);

    let mut rt = Runtime::new().expect("Could not create runtime");

    loop {
        let iters = rt
            .block_on(
                benchmark(
                    Config {
                        workers: 65536,
                        duration: Duration::from_secs(duration),
                    },
                    || {
                        /// Substitute actual benchmarked call
                        timer::Delay::new(Instant::now() + ms)
                            .map_err(|err| panic!("Failed to set delay: {:?}", err))
                    },
                )
                .map_err(|err| panic!("Benchamrking error: {:?}", err)),
            )
            .expect("Runtime error");
        println!("{} iters/sec", iters as u64 / duration);
    }
}

However, the result this benchmark reports and memory consumption degrades with increase of benchmark duration, e.g. on my pc:

cargo run --release 1 ~ 900k iterations/sec
cargo run --release 2 ~ 700k iterations/sec
cargo run --release 10 ~ 330k iterations/sec

Also, memory usage rapidly grows as benchmark function runs. I tried using valgrind to find memory leak but it only reports that all allocated memory can still be reached.

How can I fix the issue?

Upvotes: 1

Views: 1907

Answers (2)

Gr&#233;gory OBANOS
Gr&#233;gory OBANOS

Reputation: 1051

It looks like the Box returned by cycle is not freed until the end of benchmark, and the memory allocation / deallocation take more and more time.

I have rewritten your program with async_await, without the Box and the results are now consistent :

#![feature(async_await)]

use futures::{compat::Future01CompatExt, future, prelude::*, select};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::timer;

struct Config {
    workers: u32,
    duration: Duration,
}

// Build infinitely repeating future
async fn cycle<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(f: F) {
    loop {
        f().await;
    }
}

async fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(
    config: Config,
    f: F,
) -> usize {
    let counter = AtomicUsize::new(0);

    let infinite_counter = future::select_all((0..config.workers).map(|_| {
        cycle(|| {
            f().map(|_| {
                counter.fetch_add(1, Ordering::SeqCst);
            })
        })
        .boxed_local()
    }));

    let timer = timer::Delay::new(Instant::now() + config.duration)
        .compat()
        .unwrap_or_else(|_| panic!("Boom !"));

    select! {
        a = infinite_counter.fuse() => (),
        b = timer.fuse() => (),
    };

    counter.load(Ordering::SeqCst)
}

fn main() {
    let duration = std::env::args()
        .skip(1)
        .next()
        .expect("Please provide duration in seconds")
        .parse()
        .expect("Duration must be integer number");

    let ms = Duration::from_millis(1);

    // Use actix_rt runtime instead of vanilla tokio because I want
    // to restrict to one OS thread and avoid needing async primitives
    let mut rt = actix_rt::Runtime::new().expect("Could not create runtime");;

    loop {
        let iters = rt
            .block_on(
                benchmark(
                    Config {
                        workers: 65536,
                        duration: Duration::from_secs(duration),
                    },
                    || {
                        // Substitute actual benchmarked call
                        timer::Delay::new(Instant::now() + ms)
                            .compat()
                            .unwrap_or_else(|_| panic!("Boom !"))
                    },
                )
                .boxed_local()
                .unit_error()
                .compat(),
            )
            .expect("Runtime error");
        println!("{} iters/sec", iters as u64 / duration);
    }
}

It's my first time with futures 0.3, so I don't really get some parts like the select! syntax, or the boxed_local, but it works !


EDIT: Here is the dependencies block from Cargo.toml

[dependencies]
futures-preview = { version = "0.3.0-alpha", features = ["nightly", "compat", "async-await"] }
tokio = "0.1.22"
actix-rt = "0.2.3"

Upvotes: 4

Cthulhu
Cthulhu

Reputation: 1372

So it turns out cycle really was a culprit as Gregory suspected. I found this useful function in futures crate: loop_fn, and rewritten cycle using it:

/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
    f: F,
) -> impl Future<Item = (), Error = P::Error> + 'a {
    future::loop_fn((), move |_| f().map(|_| future::Loop::Continue(())))
}

The rest of the code remains the same. Now this compiles with stable rust and even reports almost twice as much iterations per second as proposed nightly futures solution (for what it's worth with this synthetic test).

Upvotes: 2

Related Questions