Reputation: 5123
I am trying to write a tokio event loop that can perform get request from the same server, with the following characteristics:
In my tries so far, I've managed to get different combinations of the 4 items working, but never all together. My main problem is that I don't quite understand how I can add new futures to the tokio event loop.
I assume I need to use loop_fn
for the main loop that polls the receiver, and handle.spawn
to spawn new tasks? handle.spawn
only allows futures of Result<(),()>
, so I can't use its output to respawn a job on failure, so I need to move the retry check into that future?
Below is an attempt that accepts and processes urls in batch (so no continuous polling), and has a timeout (but no retry):
fn place_dls(&mut self, reqs: Vec<String>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();
let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
// send with request through an async reqwest client in self
}));
let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
n.into_body().concat2().and_then(|full_body| {
debug!("Received: {:#?}", full_body);
// TODO: how to put the download back in the queue if failure code is received?
})
});
let work = rec_dls.select2(timeout).then(|res| match res {
Ok(Either::A((got, _timeout))) => {
Ok(got)
},
Ok(Either::B((_timeout_error, _get))) => {
// TODO: put back in queue
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting",
).into())
}
Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
});
core.run(work);
}
My try with a loop_fn
was sadly unsuccessful.
Upvotes: 1
Views: 1378
Reputation: 2701
I assume I need to use loop_fn for the main loop
I'd suggest slightly another approach: implement futures::sync::mpsc::Receiver
stream consumer instead of a loop.
It could be viewed as some kind of master process: after receiving an url via Receiver
a tokio task could be spawned for contents downloading. Then there will be no problem with retrying: just send the failed or timed out url again to master channel via its Sender
endpoint.
Here is a working code sketch:
extern crate futures;
extern crate tokio;
use std::{io, time::{Duration, Instant}};
use futures::{
Sink,
Stream,
stream,
sync::mpsc,
future::Future,
};
use tokio::{
runtime::Runtime,
timer::{Delay, Timeout},
};
fn main() -> Result<(), io::Error> {
let mut rt = Runtime::new()?;
let executor = rt.executor();
let (tx, rx) = mpsc::channel(0);
let master_tx = tx.clone();
let master = rx.for_each(move |url: String| {
let download_future = download(&url)
.map(|_download_contents| {
// TODO: actually do smth with contents
()
});
let timeout_future =
Timeout::new(download_future, Duration::from_millis(2000));
let job_tx = master_tx.clone();
let task = timeout_future
.or_else(move |error| {
// actually download error or timeout, retry
println!("retrying {} because of {:?}", url, error);
job_tx.send(url).map(|_| ()).map_err(|_| ())
});
executor.spawn(task);
Ok(())
});
rt.spawn(master);
let urls = vec![
"http://url1".to_string(),
"http://url2".to_string(),
"http://url3".to_string(),
];
rt.executor()
.spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));
rt.shutdown_on_idle().wait()
.map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
}
#[derive(Debug)]
struct DownloadContents;
#[derive(Debug)]
struct DownloadError;
fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
// TODO: actually download here
match url {
// url2 always fails
"http://url2" => {
println!("FAILED downloading: {}", url);
let future = Delay::new(Instant::now() + Duration::from_millis(1000))
.map_err(|_| DownloadError)
.and_then(|()| Err(DownloadError));
Box::new(future)
},
// url3 always timeouts
"http://url3" => {
println!("TIMEOUT downloading: {}", url);
let future = Delay::new(Instant::now() + Duration::from_millis(5000))
.map_err(|_| DownloadError)
.and_then(|()| Ok(DownloadContents));
Box::new(future)
},
// everything else succeeds
_ => {
println!("SUCCESS downloading: {}", url);
let future = Delay::new(Instant::now() + Duration::from_millis(50))
.map_err(|_| DownloadError)
.and_then(|()| Ok(DownloadContents));
Box::new(future)
},
}
}
Upvotes: 2