Leonti
Leonti

Reputation: 10970

Start multiple threads with Tokio

I'm trying to create a basic tcp server:

  1. Server should be able to broadcast a stream of messages to all of the connected clients
  2. Server should be able to receive commands from all clients and process them

This is what I've got in my main function:

let (server_tx, server_rx) = mpsc::unbounded();
let state = Arc::new(Mutex::new(Shared::new(server_tx)));

let addr = "127.0.0.1:6142".parse().unwrap();

let listener = TcpListener::bind(&addr).unwrap();

let server = listener.incoming().for_each(move |socket| {
    // Spawn a task to process the connection
    process(socket, state.clone());
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

println!("server running on localhost:6142");

let _messages = server_rx.for_each(|_| {
    // process messages here
    Ok(())
}).map_err(|err| {
    println!("message error = {:?}", err);
});

tokio::run(server);  

(playground)

I'm using the chat.rs example from tokio repository as a base.
I'm sending data to server_tx on incoming tcp messages.
What I have trouble with is consuming them.
I'm "consuming" incoming message stream using server_rx.for_each(|_| {, now, how do I tell tokio to run it?

tokio::run accepts a single future, but I have 2 (and possibly more). How do I combine them so they run in parallel?

Upvotes: 0

Views: 3172

Answers (1)

attdona
attdona

Reputation: 18993

Joins the futures together:

let messages = server_rx.for_each(|_| {
    println!("Message broadcasted");
    Ok(())
}).map_err(|err| {
    println!("accept error = {:?}", err);
});

tokio::run(server.join(messages).map(|_| ()));

The map() combinator is needed because Join Item associated type is a tuple ((), ()) and tokio::run() consume a future task that require a Future::Item of type ()

Upvotes: 4

Related Questions