Reputation: 2175
I have a dynamic collection of futures::sync::mpsc::Sender
, and I would like to send a message to each of them for every incoming connection.
I have it working with UnboundedSender
, because I can just do that (see below) but Sender
consumes itself, so I need to remove and reinsert it into the Vec
after sending. How can I do this? If the Sender
blocks, it should not send more messages, but instead switch to handling the incoming connections on the receiver.
The UnboundedSender
implementation is below, my failed attempt at doing it otherwise is commented out inline (just replace the preceeding line with the commented out one).
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::UnboundedSender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::unbounded();
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.unbounded_send(x).unwrap();
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink};
use futures::sync::mpsc;
fn main() {
let values = vec![1 as i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1];
let mut senders = Vec::<mpsc::Sender<i8>>::new();
let stream = stream::iter_ok::<_, ()>(values)
.for_each(|v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
current_thread::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}))
},
-1 => {
println!("Closing channels");
senders.clear();
},
x => {
for sender in senders.iter() {
println!("Sending {}", x);
sender.send(x);
//^error[E0507]: cannot move out of borrowed content
}
},
}
Ok(())
});
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
Upvotes: 1
Views: 1823
Reputation: 2175
I think I've worked it out - the trick is to pass senders
in and keep passing it down the chain of futures. This doesn't handle the -1
to clear senders but the extension is straightforward.
extern crate tokio;
use tokio::runtime::current_thread;
extern crate futures;
use futures::{stream, Stream, Sink, Future, IntoFuture};
use futures::sync::mpsc;
use futures::future::Either;
fn main() {
let values = vec![0, 1, 0, 2, 3];
let stream = stream::iter_ok::<Vec<i8>, mpsc::SendError<i8>>(values)
.fold(Vec::new(), |mut senders, v| {
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(0);
senders.push(sender);
let idx = senders.len();
current_thread::spawn(receiver.for_each(move |v| {
println!("Received {} in channel {}", v, idx);
Ok(())
}));
Either::A(Ok(senders).into_future())
},
value => {
println!("Sending {}...", value);
Either::B(stream::iter_ok(senders).and_then(move |tx| {
tx.send(value)
}).collect().map(move |senders| {
println!("Sent {}.", value);
senders
}))
},
}
}).map(drop);
current_thread::block_on_all(stream)
.expect("Failed to run stream");
println!("Done!");
}
This outputs:
Adding channel
Sending 1...
Received 1 in channel 1
Sent 1.
Adding channel
Sending 2...
Received 2 in channel 1
Received 2 in channel 2
Sent 2.
Sending 3...
Received 3 in channel 1
Received 3 in channel 2
Sent 3.
Done!
Upvotes: 0
Reputation: 26717
AFAIK, You have two main problems, send()
take the ownership of Sender
so you have to clone somewhere if you want reuse it later and also it returns a future that you have to process somehow.
There are different ways to fix these problems, here one:
extern crate futures;
extern crate tokio;
use futures::sync::mpsc;
use futures::Future;
use futures::{stream, Sink, Stream};
fn main() {
let values = vec![1i8, 2, 0, 1, 2, 3, 0, 1, 2, 3, -1]; // remove cast syntax
let mut senders = vec![]; // remove annotations
let stream = stream::iter_ok(values).for_each(move |v| { // move senders
match v {
0 => {
println!("Adding channel");
let (sender, receiver) = mpsc::channel(1);
senders.push(sender);
tokio::spawn(receiver.for_each(|v| {
println!("Received {}", v);
Ok(())
}));
}
-1 => {
println!("Closing channels");
senders.clear();
}
x => {
for sender in senders.iter() {
let send = sender
.clone() // clone sender
.send(x)
.map(move |_| println!("Sending {}", x))
.map_err(|e| eprintln!("error = {:?}", e));
tokio::spawn(send); // spawn the task
}
}
}
Ok(())
});
tokio::run(stream);
println!("Done!");
}
Upvotes: 1