Kevin
Kevin

Reputation: 3348

Process elements in futures::stream::Stream in parallel and simultaneously append new elements

I am trying to "connect" to multiple peers and "process" them in parallel. I have the following implementation

pub async fn process(addr: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(core::time::Duration::from_millis(1)).await;
    println!("processed {}", addr);
}

#[tokio::main]
async fn main() {
    let mut peers = vec!["127.0.0.1", "139.48.123.146", "123.123.46.209"];
    
    let conn_fut = futures::future::join_all(peers.iter().map(|peer| {
        async move {
            println!("connecting to {}", peer);
            process(peer).await;
        }
    }));
    
    // awaits all futures in parallell
    conn_fut.await;
}

playground

output:

connecting to 127.0.0.1
connecting to 139.48.123.146
connecting to 123.123.46.209
processed 127.0.0.1
processed 139.48.123.146
processed 123.123.46.209

This is done in parallel since connecting to e.g. 139.48.123.146 can be done before processing 127.0.0.1.

In addition to this I want to add new connections to conn_fut while I am awaiting all futures in parallel. I think something like this would work:

pub async fn process(addr: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(core::time::Duration::from_millis(1)).await;
    println!("processed {}", addr);
}

#[tokio::main]
async fn main() {
    let peers = vec!["127.0.0.1", "139.48.123.146", "123.123.46.209"];

    let conn_fut = futures::future::join_all(peers.iter().map(|peer| {
        async move {
            println!("connecting to {}", peer);
            process(peer).await;
        }
    }));
    
    let new_peers = vec!["123.0.0.1", "124.0.0.1"];
    let new_conn_fut = futures::future::join_all(new_peers.iter().map(|peer| {
        async move {
            println!("connecting to {}", peer);
            process(peer).await;
        }
    }));

    // awaits all futures in parallell
    futures::future::join(conn_fut, new_conn_fut).await;
}

playground

output:

connecting to 127.0.0.1
connecting to 139.48.123.146
connecting to 123.123.46.209
connecting to 123.0.0.1
connecting to 124.0.0.1
processed 127.0.0.1
processed 139.48.123.146
processed 123.123.46.209
processed 123.0.0.1
processed 124.0.0.1

But, since I am retrieving the addresses from another asynchronous process I have this:

pub async fn process(addr: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(core::time::Duration::from_millis(1)).await;
    println!("processed {}", addr);
}

#[tokio::main]
async fn main() {
    let peers = vec!["127.0.0.1", "139.48.123.146", "123.123.46.209"];

    let conn_fut = futures::future::join_all(peers.iter().map(|peer| {
        async move {
            println!("connecting to {}", peer);
            process(peer).await;
        }
    }));
    
    let (tx, mut rx) = tokio::sync::mpsc::channel(100);
    
    let handle_conn_fut = async move {
        while let Some(peer) = rx.recv().await {
            println!("connecting to {}", peer);
            process(peer).await;
        }
    };
    
    let create_new_conn_fut = async move {
        for peer in ["123.0.0.1", "124.0.0.1"] {
            tx.send(peer).await.unwrap();
        }
    };

    // does not await futures in parallel
    futures::future::join3(conn_fut, handle_conn_fut, create_new_conn_fut).await;
}

playground

output:

connecting to 127.0.0.1
connecting to 139.48.123.146
connecting to 123.123.46.209
connecting to 123.0.0.1
processed 127.0.0.1
processed 139.48.123.146
processed 123.123.46.209
processed 123.0.0.1
connecting to 124.0.0.1
processed 124.0.0.1

Which does not process the new peers in parallel. I am not sure how write this, I have looked into using futures::stream which tokio::sync::mpsc::Receiver can be converted into by using tokio_stream::wrappers::ReceiverStream (?). The futures::stream::StreamExt trait implements for_each_concurrent which can process each element in parallel.

I am not sure what the best way is to go about this. Intuitively, I want to write something like this:

async fn connect(peers: futures::stream::Stream<&str>) {
    peers.for_each_concurrent(|p| process(p).await);
}

#[tokio::main]
async fn main() {
  let mut initial_peers = vec!["127.0.0.1", "139.48.123.123"];
  let peers = futures::stream::iter(initial_peers)
  let conn_fut = connect(peers)
  let new_peers_fut = async move {
      for new_peer in ["123.0.0.1", "124.0.0.1"] {
          peers.push_back(new_peer)
      }
  }
  futures::future::join(conn_fut, new_peers_fut).await
}

Is it possible to process the elements in a futures::stream::Stream in parallel and simultaneously append new elements to the end of the stream which in turn are also processed together with the other elements?

Upvotes: 0

Views: 702

Answers (1)

Kevin
Kevin

Reputation: 3348

Solved this by converting conn_fut to a futures::stream and chained it with rx:

use futures::stream::StreamExt;

pub async fn process(addr: &str) {
    // simulate processing by sleeping
    tokio::time::sleep(core::time::Duration::from_millis(1)).await;
    println!("processed {}", addr);
}

#[tokio::main]
async fn main() {
    let peers = vec!["127.0.0.1", "139.48.123.146", "123.123.46.209"];
    let peers = futures::stream::iter(peers);

    let (tx, rx) = tokio::sync::mpsc::channel(100);

    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
    let rx = peers.chain(rx);
    
    let handle_conn_fut = rx.for_each_concurrent(0,
        |peer| async move {
            println!("connecting to {}", peer);
            process(peer).await;
        }
    );
    
    let create_new_conn_fut = async move {
        for peer in ["123.0.0.1", "124.0.0.1"] {
            tx.send(peer).await.unwrap();
        }
    };

    // awaits all futures in parallell
    futures::future::join(handle_conn_fut, create_new_conn_fut).await;
}

output:

connecting to 127.0.0.1
connecting to 139.48.123.146
connecting to 123.123.46.209
connecting to 123.0.0.1
connecting to 124.0.0.1
processed 127.0.0.1
processed 139.48.123.146
processed 123.123.46.209
processed 123.0.0.1
processed 124.0.0.1

Upvotes: 1

Related Questions