Reputation: 1372
I wanted to benchmark requests from rust to particular service using async client, and created async benchmarker for that.
This function should run specified amount of concurrent threads (actually, parallel chains of futures) for specified duration and report count of iterations achieved.
use futures::future;
use futures::prelude::*;
use std::error::Error;
use std::time::{Duration, Instant};
use std::{cell, io, rc};
use tokio::runtime::current_thread::Runtime;
use tokio::timer;
struct Config {
workers: u32,
duration: Duration,
}
/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
f: F,
) -> Box<dyn Future<Item = (), Error = P::Error> + 'a> {
Box::new(f().and_then(move |_| cycle(f)))
}
fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Error = io::Error> + 'a>(
config: Config,
f: F,
) -> impl Future<Item = u32, Error = io::Error> + 'a {
let counter = rc::Rc::new(cell::Cell::new(0u32));
let f = rc::Rc::new(f);
future::select_all((0..config.workers).map({
let counter = rc::Rc::clone(&counter);
move |_| {
let counter = rc::Rc::clone(&counter);
let f = rc::Rc::clone(&f);
cycle(move || {
let counter = rc::Rc::clone(&counter);
f().map(move |_| {
counter.set(counter.get() + 1);
})
})
}
}))
.map(|((), _, _)| ())
.map_err(|(err, _, _)| err)
.select(
timer::Delay::new(Instant::now() + config.duration)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.description())),
)
.map(move |((), _)| counter.get())
.map_err(|(err, _)| err)
}
fn main() {
let duration = std::env::args()
.skip(1)
.next()
.expect("Please provide duration in seconds")
.parse()
.expect("Duration must be integer number");
let ms = Duration::from_millis(1);
let mut rt = Runtime::new().expect("Could not create runtime");
loop {
let iters = rt
.block_on(
benchmark(
Config {
workers: 65536,
duration: Duration::from_secs(duration),
},
|| {
/// Substitute actual benchmarked call
timer::Delay::new(Instant::now() + ms)
.map_err(|err| panic!("Failed to set delay: {:?}", err))
},
)
.map_err(|err| panic!("Benchamrking error: {:?}", err)),
)
.expect("Runtime error");
println!("{} iters/sec", iters as u64 / duration);
}
}
However, the result this benchmark reports and memory consumption degrades with increase of benchmark duration, e.g. on my pc:
cargo run --release 1
~ 900k iterations/sec
cargo run --release 2
~ 700k iterations/sec
cargo run --release 10
~ 330k iterations/sec
Also, memory usage rapidly grows as benchmark function runs. I tried using valgrind
to find memory leak but it only reports that all allocated memory can still be reached.
How can I fix the issue?
Upvotes: 1
Views: 1907
Reputation: 1051
It looks like the Box
returned by cycle
is not freed until the end of benchmark
, and the memory allocation / deallocation take more and more time.
I have rewritten your program with async_await
, without the Box
and the results are now consistent :
#![feature(async_await)]
use futures::{compat::Future01CompatExt, future, prelude::*, select};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::timer;
struct Config {
workers: u32,
duration: Duration,
}
// Build infinitely repeating future
async fn cycle<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(f: F) {
loop {
f().await;
}
}
async fn benchmark<'a, F: Fn() -> P + 'a, P: Future<Output = ()> + 'a>(
config: Config,
f: F,
) -> usize {
let counter = AtomicUsize::new(0);
let infinite_counter = future::select_all((0..config.workers).map(|_| {
cycle(|| {
f().map(|_| {
counter.fetch_add(1, Ordering::SeqCst);
})
})
.boxed_local()
}));
let timer = timer::Delay::new(Instant::now() + config.duration)
.compat()
.unwrap_or_else(|_| panic!("Boom !"));
select! {
a = infinite_counter.fuse() => (),
b = timer.fuse() => (),
};
counter.load(Ordering::SeqCst)
}
fn main() {
let duration = std::env::args()
.skip(1)
.next()
.expect("Please provide duration in seconds")
.parse()
.expect("Duration must be integer number");
let ms = Duration::from_millis(1);
// Use actix_rt runtime instead of vanilla tokio because I want
// to restrict to one OS thread and avoid needing async primitives
let mut rt = actix_rt::Runtime::new().expect("Could not create runtime");;
loop {
let iters = rt
.block_on(
benchmark(
Config {
workers: 65536,
duration: Duration::from_secs(duration),
},
|| {
// Substitute actual benchmarked call
timer::Delay::new(Instant::now() + ms)
.compat()
.unwrap_or_else(|_| panic!("Boom !"))
},
)
.boxed_local()
.unit_error()
.compat(),
)
.expect("Runtime error");
println!("{} iters/sec", iters as u64 / duration);
}
}
It's my first time with futures 0.3, so I don't really get some parts like the select!
syntax, or the boxed_local
, but it works !
EDIT: Here is the dependencies block from Cargo.toml
[dependencies]
futures-preview = { version = "0.3.0-alpha", features = ["nightly", "compat", "async-await"] }
tokio = "0.1.22"
actix-rt = "0.2.3"
Upvotes: 4
Reputation: 1372
So it turns out cycle
really was a culprit as Gregory suspected. I found this useful function in futures crate: loop_fn, and rewritten cycle
using it:
/// Build infinitely repeating future
fn cycle<'a, F: Fn() -> P + 'a, P: Future + 'a>(
f: F,
) -> impl Future<Item = (), Error = P::Error> + 'a {
future::loop_fn((), move |_| f().map(|_| future::Loop::Continue(())))
}
The rest of the code remains the same. Now this compiles with stable rust and even reports almost twice as much iterations per second as proposed nightly futures solution (for what it's worth with this synthetic test).
Upvotes: 2