Drew
Drew

Reputation: 8963

Implementing "move" thread semantics

I want to write a function to be called like this:

send("message","address");

Where some other thread that is doing

let k = recv("address");
println!("{}",k);

sees message.

In particular, the message may be large, and so I'd like "move" or "zero-copy" semantics for sending the message.

In C, the solution is something like:

  1. Allocate messages on the heap
  2. Have a global, threadsafe hashmap that maps "address" to some memory location
  3. Write pointers into the memory location on send, and wake up the receiver using a semaphore
  4. Read pointers out of the memory location on receive, and wait on a semaphore to process new messages

But according to another SO question, step #2 "sounds like a bad idea". So I'd like to see a more Rust-idiomatic way to approach this problem.

Upvotes: 0

Views: 98

Answers (1)

huon
huon

Reputation: 102216

You get these sort of move semantics automatically, and get achieve light-weight moves by placing large values into a Box (i.e. allocate them on the heap). Using type ConcurrentHashMap<K, V> = Mutex<HashMap<K, V>>; as the threadsafe hashmap (there's various ways this could be improved), one might have:

use std::collections::{HashMap, RingBuf};
use std::sync::Mutex;

type ConcurrentHashMap<K, V> = Mutex<HashMap<K, V>>;

lazy_static! {
    pub static ref MAP: ConcurrentHashMap<String, RingBuf<String>> = {
        Mutex::new(HashMap::new())
    }
}

fn send(message: String, address: String) {
    MAP.lock()
       // find the place this message goes
       .entry(address)
       .get()
       // create a new RingBuf if this address was empty
       .unwrap_or_else(|v| v.insert(RingBuf::new()))
       // add the message on the back
       .push_back(message)
}
fn recv(address: &str) -> Option<String> {
     MAP.lock()
        .get_mut(address)
        // pull the message off the front
        .and_then(|buf| buf.pop_front())
}

That code is using the lazy_static! macro to achieve a global hashmap (it may be better to use a local object that wraps an Arc<ConcurrentHashMap<...>, fwiw, since global state can make reasoning about program behaviour hard). It also uses RingBuf as a queue, so that messages bank up for a given address. If you only wish to support one message at a time, the type could be ConcurrentHashMap<String, String>, send could become MAP.lock().insert(address, message) and recv just MAP.lock().remove(address).

(NB. I haven't compiled this, so the types may not match up precisely.)

Upvotes: 2

Related Questions