kai
kai

Reputation: 2175

Send to each futures::sync::mpsc::Sender in array

I have a dynamic collection of futures::sync::mpsc::Sender, and I would like to send a message to each of them for every incoming connection.

I have it working with UnboundedSender, because I can just do that (see below) but Sender consumes itself, so I need to remove and reinsert it into the Vec after sending. How can I do this? If the Sender blocks, it should not send more messages, but instead switch to handling the incoming connections on the receiver.

The UnboundedSender implementation is below, my failed attempt at doing it otherwise is commented out inline (just replace the preceeding line with the commented out one).

UnboundedSender (works)

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;

fn main() {
    let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
    let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
    let stream = stream::iter_ok::<_, ()>(values)
        .for_each(|v| {
            match v {
                0 => {
                    println!("Adding channel");
                    let (sender, receiver) = mpsc::unbounded();
                    senders.push(sender);
                    current_thread::spawn(receiver.for_each(|v| {
                        println!("Received {}", v);
                        Ok(())
                    }))

                },
                -1 => {
                    println!("Closing channels");
                    senders.clear();
                },
                x => {
                    for sender in senders.iter() {
                        println!("Sending {}", x);
                        sender.unbounded_send(x).unwrap();
                    }
                },
            }
            Ok(())
        });

    current_thread::block_on_all(stream)
        .expect("Failed to run stream");
    println!("Done!");
}

Sender (doesn't work)

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;

fn main() {
    let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
    let mut senders = Vec::<mpsc::Sender<i8>>::new();
    let stream = stream::iter_ok::<_, ()>(values)
        .for_each(|v| {
            match v {
                0 => {
                    println!("Adding channel");
                    let (sender, receiver) = mpsc::channel(1);
                    senders.push(sender);
                    current_thread::spawn(receiver.for_each(|v| {
                        println!("Received {}", v);
                        Ok(())
                    }))

                },
                -1 => {
                    println!("Closing channels");
                    senders.clear();
                },
                x => {
                    for sender in senders.iter() {
                        println!("Sending {}", x);
                        sender.send(x);
                        //^error[E0507]: cannot move out of borrowed content
                    }
                },
            }
            Ok(())
        });

    current_thread::block_on_all(stream)
        .expect("Failed to run stream");
    println!("Done!");
}

Upvotes: 1

Views: 1823

Answers (2)

kai
kai

Reputation: 2175

I think I've worked it out - the trick is to pass senders in and keep passing it down the chain of futures. This doesn't handle the -1 to clear senders but the extension is straightforward.

extern crate tokio;
use tokio::runtime::current_thread;

extern crate futures;
use futures::{stream, Stream, Sink, Future, IntoFuture};
use futures::sync::mpsc;
use futures::future::Either;

fn main() {
    let values = vec![0, 1, 0, 2, 3];
    let stream = stream::iter_ok::<Vec<i8>, mpsc::SendError<i8>>(values)
        .fold(Vec::new(), |mut senders, v| {
            match v {
                0 => {
                    println!("Adding channel");
                    let (sender, receiver) = mpsc::channel(0);
                    senders.push(sender);
                    let idx = senders.len();
                    current_thread::spawn(receiver.for_each(move |v| {
                        println!("Received {} in channel {}", v, idx);
                        Ok(())
                    }));
                    Either::A(Ok(senders).into_future())
                },
                value => {
                    println!("Sending {}...", value);
                    Either::B(stream::iter_ok(senders).and_then(move |tx| {
                        tx.send(value)
                    }).collect().map(move |senders| {
                        println!("Sent {}.", value);
                        senders
                    }))
                },
            }
        }).map(drop);

    current_thread::block_on_all(stream)
        .expect("Failed to run stream");
    println!("Done!");
}

This outputs:

Adding channel
Sending 1...
Received 1 in channel 1
Sent 1.
Adding channel
Sending 2...
Received 2 in channel 1
Received 2 in channel 2
Sent 2.
Sending 3...
Received 3 in channel 1
Received 3 in channel 2
Sent 3.
Done!

Upvotes: 0

Stargateur
Stargateur

Reputation: 26717

AFAIK, You have two main problems, send() take the ownership of Sender so you have to clone somewhere if you want reuse it later and also it returns a future that you have to process somehow.

There are different ways to fix these problems, here one:

extern crate futures;
extern crate tokio;

use futures::sync::mpsc;
use futures::Future;
use futures::{stream, Sink, Stream};

fn main() {
    let values = vec![1i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1]; // remove cast syntax
    let mut senders = vec![]; // remove annotations
    let stream = stream::iter_ok(values).for_each(move |v| { // move senders
        match v {
            0 => {
                println!("Adding channel");
                let (sender, receiver) = mpsc::channel(1);
                senders.push(sender);
                tokio::spawn(receiver.for_each(|v| {
                    println!("Received {}", v);
                    Ok(())
                }));
            }
            -1 => {
                println!("Closing channels");
                senders.clear();
            }
            x => {
                for sender in senders.iter() {
                    let send = sender
                        .clone() // clone sender
                        .send(x)
                        .map(move |_| println!("Sending {}", x))
                        .map_err(|e| eprintln!("error = {:?}", e));
                    tokio::spawn(send); // spawn the task
                }
            }
        }
        Ok(())
    });

    tokio::run(stream);
    println!("Done!");
}

Upvotes: 1

Related Questions