Sasha Kondrashov
Sasha Kondrashov

Reputation: 4570

How can I share or avoid sharing a websocket resource between two threads?

I am using tungstenite to build a chat server, and the way I want to do it relies on having many threads that communicate with each other through mpsc. I want to start up a new thread for each user that connects to the server and connect them to a websocket, and also have that thread be able to read from mpsc so that the server can send messages out through that connection.

The problem is that the mpsc read blocks the thread, but I can't block the thread if I want to be reading from it. The only thing I could think of to work around that is to make two threads, one for inbound and one for outbound messages, but that requires me to share my websocket connection with both workers, which of course I cannot do.

Here's a heavily truncated version of my code where I try to make two workers in the Action::Connect arm of the match statement, which gives error[E0382]: use of moved value: 'websocket' for trying to move it into the second worker's closure:

extern crate tungstenite;
extern crate workerpool;

use std::net::{TcpListener, TcpStream};
use std::sync::mpsc::{self, Sender, Receiver};
use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use tungstenite::server::accept;

pub enum Action {
  Connect(TcpStream),
  Send(String),
}

fn main() {
  let (main_send, main_receive): (Sender<Action>, Receiver<Action>) = mpsc::channel();
  let worker_pool = Pool::<ThunkWorker<()>>::new(8);
  {
    // spawn thread to listen for users connecting to the server
    let main_send = main_send.clone();
    worker_pool.execute(Thunk::of(move || {
      let listener = TcpListener::bind(format!("127.0.0.1:{}", 8080)).unwrap();
      for (_, stream) in listener.incoming().enumerate() {
        main_send.send(Action::Connect(stream.unwrap())).unwrap();
      }
    }));
  }

  let mut users: Vec<Sender<String>> = Vec::new();

  // process actions from children
  while let Some(act) = main_receive.recv().ok() {
    match act {
      Action::Connect(stream) => {
        let mut websocket = accept(stream).unwrap();
        let (user_send, user_receive): (Sender<String>, Receiver<String>) = mpsc::channel();
        let main_send = main_send.clone();

        // thread to read user input and propagate it to the server
        worker_pool.execute(Thunk::of(move || {
          loop {
            let message = websocket.read_message().unwrap().to_string();
            main_send.send(Action::Send(message)).unwrap();
          }
        }));

        // thread to take server output and propagate it to the server
        worker_pool.execute(Thunk::of(move || {
          while let Some(message) = user_receive.recv().ok() {
            websocket.write_message(tungstenite::Message::Text(message.clone())).unwrap();
          }
        }));
        users.push(user_send);
      }
      Action::Send(message) => {
        // take user message and echo to all users
        for user in &users {
          user.send(message.clone()).unwrap();
        }
      }
    }
  }
}

If I create just one thread for both in and output in that arm, then user_receive.recv() blocks the thread so I can't read any messages with websocket.read_message() until I get an mpsc message from the main thread. How can I solve both problems? I considered cloning the websocket but it doesn't implement Clone and I don't know if just making a new connection with the same stream is a reasonable thing to try to do, it seems hacky.

Upvotes: 1

Views: 1745

Answers (1)

MaxV
MaxV

Reputation: 2810

The problem is that the mpsc read blocks the thread

You can use try_recv to avoid thread blocking. The another implementation of mpsc is crossbeam_channel. That project is a recommended replacement even by the author of mpsc

I want to start up a new thread for each user that connects to the server

I think the asyn/await approach will be much better from most of the prospectives then thread per client one. You can read more about it there

Upvotes: 1

Related Questions