codeNoob
codeNoob

Reputation: 333

Can I maintain a Vec of TcpStreams to write to them across threads whenever one of them reads a new input?

I want to do message broadcasting: when one of the clients sends a message, the server writes it to each socket. My main problem is I can't figure out how to send the Vec to the threads. I can't use Mutex because that will lock the access of other threads to the Vec for reading. I can't clone and send because TcpStream can't be cloned and sent. Here's my attempt until now

use std::net::{TcpStream, TcpListener};
use std::io::prelude::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::cell::RefCell;

type StreamSet = Arc<RefCell<Vec<TcpStream>>>;
type StreamReceiver = Arc<Mutex<Receiver<StreamSet>>>;

fn main() {
    let listener = TcpListener::bind("0.0.0.0:8000").unwrap();
    let mut connection_set: StreamSet = Arc::new(RefCell::new(vec![]));
    let mut id = 0;
    let (tx, rx) = channel();
    let rx = Arc::new(Mutex::new(rx));
    for stream in listener.incoming() {
        let receiver = rx.clone();
        let stream = stream.unwrap();
        (*connection_set).borrow_mut().push(stream);
        println!("A connection established with client {}", id);
        thread::spawn(move || handle_connection(receiver, id));
        id += 1;
        tx.send(connection_set.clone()).unwrap();
    }

}

fn handle_connection(rx: StreamReceiver, id: usize) {
    let streams;
    {
        streams = *(rx.lock().unwrap().recv().unwrap()).borrow();
    }
    let mut connection = &streams[id];
    loop {
        let mut buffer = [0; 512];
        if let Err(_) = connection.read(&mut buffer) {
            break;
        };
        println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
        if let Err(_) = connection.write(&buffer[..]) {
            break;
        };
        if let Err(_) = connection.flush() {
            break;
        };
    }
}

Upvotes: 0

Views: 1505

Answers (1)

Shepmaster
Shepmaster

Reputation: 430871

Another idea is to spawn a single "controller" thread and a thread for every socket. Each thread would own the socket and have a channel to send data back to the controller. The controller would own a Vec of channels to send to each thread. When a thread receives data, you send it to the controller which duplicates it and sends it back to each worker thread. You can wrap the data in an Arc to prevent unneeded duplication, and you should provide an ID to avoid echoing the data back to the original sender (if you need to).

This moves the ownership completely within a single thread, which should avoid the issues you are experiencing.

You may also wish to look into Tokio, which should allow you to do something similar but without the need to spawn threads in a 1-1 mapping.

I can't use Mutex because that will lock the access of other threads

You can always try a different locking mechanism such as a RwLock.

because TcpStream can't be cloned

Sure it can: TcpStream::try_clone.

Upvotes: 1

Related Questions