Reputation: 10970
I'm trying to create a basic tcp server:
This is what I've got in my main
function:
let (server_tx, server_rx) = mpsc::unbounded();
let state = Arc::new(Mutex::new(Shared::new(server_tx)));
let addr = "127.0.0.1:6142".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(move |socket| {
// Spawn a task to process the connection
process(socket, state.clone());
Ok(())
}).map_err(|err| {
println!("accept error = {:?}", err);
});
println!("server running on localhost:6142");
let _messages = server_rx.for_each(|_| {
// process messages here
Ok(())
}).map_err(|err| {
println!("message error = {:?}", err);
});
tokio::run(server);
I'm using the chat.rs
example from tokio repository as a base.
I'm sending data to server_tx
on incoming tcp messages.
What I have trouble with is consuming them.
I'm "consuming" incoming message stream using server_rx.for_each(|_| {
, now, how do I tell tokio to run it?
tokio::run
accepts a single future, but I have 2 (and possibly more). How do I combine them so they run in parallel?
Upvotes: 0
Views: 3172
Reputation: 18993
Joins the futures together:
let messages = server_rx.for_each(|_| {
println!("Message broadcasted");
Ok(())
}).map_err(|err| {
println!("accept error = {:?}", err);
});
tokio::run(server.join(messages).map(|_| ()));
The map()
combinator is needed because Join
Item
associated type is a tuple ((), ())
and
tokio::run()
consume a future task that require a Future::Item
of type ()
Upvotes: 4