Coldchain9
Coldchain9

Reputation: 1745

Tokio Channel isn't Receiving from Streamed Futures

I am making async requests to an API and intending to send the response on a channel to do other things with (like insert into a DB). However, it seems that my Receiver isn't reading from the channel properly. I know the requests are succeeding and data is deserialized properly, but I am seeing no action in my receiver.

Code below using jsonplaceholder free API

use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Post {
    pub user_id: u16,
    pub id: u16,
    pub title: String,
    pub body: String,
}

#[tokio::main]
async fn main() {
    // create iterator that will stream async responses
    let client = reqwest::Client::new();
    let resps = futures::stream::iter(1..)
        .map(|i| {
            let client = &client;
            async move {
                let url = format!("https://jsonplaceholder.typicode.com/posts/{i}");
                client.get(url).send().await
            }
        })
        .buffer_unordered(100);

    let (tx, mut rx) = mpsc::channel::<Post>(50);
    resps
        .for_each(|r| async {
            match r {
                Ok(resp) => {
                    // deserialize response
                    let tx_cloned = tx.clone();
                    let res = resp.json::<Post>().await.unwrap();

                    // send response on the channel
                    let _ = tx_cloned.send(res).await.unwrap();
                }
                Err(e) => {
                    println!("error consuming responses, {e}!");
                }
            }
        })
        .await;

    // consume responses from our channel to do future things with results...
    while let Some(r) = rx.recv().await {
        println!("{:?}", r);
    }
}

Upvotes: 0

Views: 56

Answers (1)

Sibear Jo
Sibear Jo

Reputation: 108

Since you don't spawn any task, all your actions are done sequentially, so you try to send an infinite number of queries before you start receiving from the channel.

Try running the for_each() loop in a separate task.

Upvotes: 1

Related Questions