Coldchain9
Coldchain9

Reputation: 1745

Using tokio::sync::Semaphore to limit async requests in a block

I am working with an API that limits me to 40 requests per second, and 200 every 120 seconds.

I am currently designing an async pattern in Rust using reqwest and tokio. I want to incorporate the rate limiting constraints. I have done similar things in Python using Semaphores and have looked into semaphores in Rust, but am not quite sure how to structure my code.

Ideally, I'd like to:

  1. Send in batches of 40 requests (never more than 40 per second)
  2. Once I hit 200 requests and the timer hasn't hit 120 seconds. Stop and wait 120 seconds. Hitting 429 will incur a 120 second wait so the goal is to fill the bucket until that limit, then wait until I can begin sending requests again.
  3. After all requests are finished, collect the responses in a Vec

Curious other thoughts and ideas on how best to handle this. I've read several other questions about this type of situation but haven't found something that works yet. Also am completely new to async-await in Rust so any refactoring advice helps.

The current async pattern is like the below:

use tokio::time::{ sleep, Duration };
use reqwest::header::HeaderMap;

async fn _make_requests(
    headers: &HeaderMap, 
    requests: &Vec<String>
) -> Result<Vec<String>, Box<dyn std::error::Error>>
{
    let client = reqwest::Client::new();
    
    // Each req is a string URL which will pull back the response text from the API
    for req in requests
    {
        let client = client.clone();
        tokio::spawn(
            match async move {
                      let resp = client.get(req)
                                       .headers(headers.to_owned())
                                       .send()
                                       .await?
                                       .text()
                                       .await?;
                      Ok(resp)
            }
            .await
            // Handle resp status in match
            {

             Ok(resp) => println!("{:?}", resp),
             Err(e) => eprintln!("{}", e),
            }
       );
    }
}

fn main()
{
  // Create sample headers
  let mut headers = HeaderMap::new();
  headers.insert("Accept", "application/json".parse().unwrap());

  let rt = tokio::runtime::Builder::new_current_thread()
      .enable_all()
      .build()
      .unwrap();
  
  // Run event loop until complete
  rt.block_on(_make_requests(&headers, &requests));

  Ok(())  
}

Upvotes: 1

Views: 1262

Answers (1)

Jonas Wolf
Jonas Wolf

Reputation: 1214

You are almost there:

use tokio::time::{ sleep, Duration };
use reqwest::header::HeaderMap;

async fn _make_requests(
    headers: &HeaderMap, 
    requests: &Vec<String>
) -> Result<Vec<String>, Box<dyn std::error::Error>>
{
    let client = reqwest::Client::new();
    // Create a semaphore (important: including Arc)
    let semaphore = Arc::new(Semaphore::new(40));
    // Each req is a string URL which will pull back the response text from the API
    for req in requests
    {
        let client = client.clone();
        // Important is the acquire_owned() here
        let semaphore = Arc::clone(&semaphore);
        let permit = semaphore.acquire_owned().await.unwrap();

        tokio::spawn(
            match async move {
                      let resp = client.get(req)
                                       .headers(headers.to_owned())
                                       .send()
                                       .await?
                                       .text()
                                       .await?;
    drop(permit)
                      Ok(resp)
            }
            .await
            // Handle resp status in match
            {

             Ok(resp) => println!("{:?}", resp),
             Err(e) => eprintln!("{}", e),
            }
       );
    }
}

I would also suggest adding the handles returned from tokio::spawn to a Vec and "await" them after your for loop.

This should give you 40 parallel spawned tokio tasks at a time.

Upvotes: 1

Related Questions