Reputation: 389
My main goal is to write an API Server, which retrieves part of the information from another external API server. However, this API server is quite fragile, therefore I would like to limit the global amount of concurrent requests made to those external API Servers for example to 10 or 20.
Thus, my idea was to write something a HttpPool
, which consumes task via a crossbeam bounded channels and distributes them among tokio tasks. The ideas was to use a bounded channel to avoid publishing to much work and use a set of tasks to limit the amount of request per external API call.
It deems to work, if I do not create more than 8 tasks. If I define more, it blocks after fetching the first tasks from the queue.
use std::{error::Error, result::Result};
use tokio::sync::oneshot::Sender;
use tokio::time::timeout;
use tokio::time::{sleep, Duration};
use crossbeam_channel;
#[derive(Debug)]
struct HttpTaskRequest {
url: String,
result: Sender<String>,
}
type PoolSender = crossbeam_channel::Sender<HttpTaskRequest>;
type PoolReceiver = crossbeam_channel::Receiver<HttpTaskRequest>;
#[derive(Debug)]
struct HttpPool {
size: i32,
sender: PoolSender,
receiver: PoolReceiver,
}
impl HttpPool {
fn new(capacity: i32) -> Self {
let (tx, rx) = crossbeam_channel::bounded::<HttpTaskRequest>(capacity as usize);
HttpPool {
size: capacity,
sender: tx,
receiver: rx,
}
}
async fn start(self) -> Result<HttpPool, Box<dyn Error>> {
for i in 0..self.size {
let task_receiver = self.receiver.clone();
tokio::spawn(async move {
loop {
match task_receiver.recv() {
Ok(request) => {
if request.result.is_closed() {
println!("Task[{i}] received url {} already closed by receiver, seems to reach timeout already", request.url);
} else {
println!("Task[{i}] started to work {:?}", request.url);
let resp = reqwest::get("https://httpbin.org/ip").await;
println!("Resp: {:?}", resp);
println!("Done Send request for url {}", request.url);
request.result.send("Result".to_owned()).expect("Failed to send result");
}
}
Err(err) => println!("Error: {err}"),
}
}
});
}
Ok(self)
}
pub async fn request(&self, url: String) -> Result<(), Box<dyn Error>> {
let (os_sender, os_receiver) = tokio::sync::oneshot::channel::<String>();
let request = HttpTaskRequest {
result: os_sender,
url: url.clone(),
};
self.sender.send(request).expect("Failed to publish message to task group");
// check if a timeout or value was returned
match timeout(Duration::from_millis(100), os_receiver).await {
Ok(res) => {
println!("Request finished without reaching the timeout {}",res.unwrap());
}
Err(_) => {println!("Request {url} run into timeout");}
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let http_pool = HttpPool::new(20).start().await.expect("Failed to start http pool");
for i in 0..10 {
let url = format!("T{}", i.to_string());
http_pool.request(url).await.expect("Failed to request message");
}
loop {}
}
Maybe somebody can explain, why the code blocks? Is it related to the tokio::spawn
?
I guess my attempt is wrong, so please let me know if there is another way to handle it. The goal can be summarized like this. I would like to requests URLs and process them in a fashion, that not more than N
concurrent requests are made against the API server.
I have read this question: How can I perform parallel asynchronous HTTP GET requests with reqwest?. But here, this answer, I do know the work, which is not the case in my example. They arrive on the fly, hence I am not sure how to handle them.
Upvotes: 0
Views: 827
Reputation: 389
I have finally solved the mystery about the blocking in my code example above. As we can see, I have used the crate crossbeam_channel
, which does not cooperate with async code. If we call recv
on this type of channel, the thread blocks until a message is received. Hence, there is no way, that we can return back to the tokio scheduler, which implies that no other task is able to run. To refresh your memories, async code only returns to the scheduler, if a .await
is called.
Furthermore, the code was working, if we have spawned less tasks than worker threads. The normal amount of worker threads is equal to the CPU core count, in my case eight. Hence, if I have started more than this, the all threads were blocked an the application freezes.
The fix was to replace the crate crossbeam-channel
with async-channel
, as stated on the tokio tutorial page.
In case my answer is vague, I recommend to read the following posts:
Upvotes: 2