Reputation: 1745
I am making async requests to an API and intending to send the response on a channel to do other things with (like insert into a DB). However, it seems that my Receiver
isn't reading from the channel properly. I know the requests are succeeding and data is deserialized properly, but I am seeing no action in my receiver.
Code below using jsonplaceholder free API
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Post {
pub user_id: u16,
pub id: u16,
pub title: String,
pub body: String,
}
#[tokio::main]
async fn main() {
// create iterator that will stream async responses
let client = reqwest::Client::new();
let resps = futures::stream::iter(1..)
.map(|i| {
let client = &client;
async move {
let url = format!("https://jsonplaceholder.typicode.com/posts/{i}");
client.get(url).send().await
}
})
.buffer_unordered(100);
let (tx, mut rx) = mpsc::channel::<Post>(50);
resps
.for_each(|r| async {
match r {
Ok(resp) => {
// deserialize response
let tx_cloned = tx.clone();
let res = resp.json::<Post>().await.unwrap();
// send response on the channel
let _ = tx_cloned.send(res).await.unwrap();
}
Err(e) => {
println!("error consuming responses, {e}!");
}
}
})
.await;
// consume responses from our channel to do future things with results...
while let Some(r) = rx.recv().await {
println!("{:?}", r);
}
}
Upvotes: 0
Views: 56