Reputation: 719
I'm trying to implement a buffer with a single consumer and a single producer. I have only used POSIX Semaphores, however, they're not available in Rust and I'm trying to implement a trivial semaphore problem with Rust sync primitives (Mutex
, Condvar
, Barrier
, ...) but I don't want to use channels.
My code behaves too irregularly, with some cases going well and other times it just stops at some number and in other cases it just doesn't start counting.
Things appear to work better if I wait 1 second in the main thread till I send the Condvar
notification but it doesn't guarantee that it's not going to enter a deadlock.
How can this program be fixed? Am I understanding Condvar
s wrong?
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
struct Buffer {
is_data: Mutex<bool>,
is_data_cv: Condvar,
is_space: Mutex<bool>,
is_space_cv: Condvar,
buffer: Mutex<i32>,
}
fn producer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_space = buffer
.is_space_cv
.wait(buffer.is_space.lock().unwrap())
.unwrap();
if *is_space {
{
let mut hueco = buffer.buffer.lock().unwrap();
*hueco = i;
}
*is_space = false;
{
let mut is_data = buffer.is_data.lock().unwrap();
*is_data = true;
}
buffer.is_data_cv.notify_one();
break;
}
}
}
}
fn consumer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_data = buffer
.is_data_cv
.wait(buffer.is_data.lock().unwrap())
.unwrap();
if *is_data {
{
let hueco = buffer.buffer.lock().unwrap();
println!("{}", *hueco);
}
*is_data = false;
{
let mut is_space = buffer.is_space.lock().unwrap();
*is_space = true;
}
buffer.is_space_cv.notify_one();
break;
}
}
}
}
fn main() {
let buffer = Arc::new(Buffer {
is_data: Mutex::new(false),
is_data_cv: Condvar::new(),
is_space: Mutex::new(true),
is_space_cv: Condvar::new(),
buffer: Mutex::new(0),
});
let b = buffer.clone();
let p = thread::spawn(move || {
producer(b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(b);
});
//thread::sleep_ms(1000);
buffer.is_space_cv.notify_one();
c.join();
}
Upvotes: 1
Views: 1744
Reputation: 5789
To improve the concurrency performance, you can add more slots in the buffer. The following example also supports multiple producers & consumers.
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread;
const MAX: usize = 10;
struct Buffer {
inner: Mutex<BufferInner>,
fill_cond: Condvar,
empty_cond: Condvar,
}
impl Buffer {
fn new() -> Self {
Buffer {
inner: Mutex::new(BufferInner {
data: [Option::None; MAX],
filled: 0,
used: 0,
count: 0,
}),
fill_cond: Condvar::new(),
empty_cond: Condvar::new(),
}
}
}
struct BufferInner {
data: [Option<i32>; MAX],
filled: usize,
used: usize,
count: usize,
}
impl BufferInner {
fn put(&mut self, value: i32) {
self.data[self.filled] = Some(value);
self.filled = (self.filled + 1) % MAX;
self.count += 1;
}
fn get(&mut self) -> i32 {
let tmp: Option<i32> = self.data[self.used];
self.used = (self.used + 1) % MAX;
self.count -= 1;
tmp.unwrap()
}
}
fn producer(buffer: &Buffer) {
for i in 0..20 {
let mut guard = buffer.inner.lock().unwrap();
while guard.count == MAX {
guard = buffer.empty_cond.wait(guard).unwrap();
}
guard.put(i);
println!("producer: {}", i);
buffer.fill_cond.notify_one();
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..20 {
let mut guard: MutexGuard<BufferInner> = buffer.inner.lock().unwrap();
while guard.count == 0_usize {
guard = buffer.fill_cond.wait(guard).unwrap();
}
let value = guard.get();
println!("consumer: {}", value);
buffer.empty_cond.notify_one();
}
}
fn main() {
let buffer = Arc::new(Buffer::new());
let buffer1 = Arc::clone(&buffer);
let p1 = thread::spawn(move || producer(&buffer));
let c1 = thread::spawn(move || consumer(&buffer1));
p1.join().unwrap();
c1.join().unwrap();
}
Upvotes: 1
Reputation: 432059
I would encourage you to create smaller methods and reuse existing Rust types such as Option
. This will allow you to simplify your code quite a bit — only one Mutex
and one Condvar
:
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
data_cv: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.data_cv.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.data_cv.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.data_cv.notify_one();
val
}
}
fn producer(buffer: &Buffer) {
for i in 0..50 {
println!("p: {}", i);
buffer.insert(i);
}
}
fn consumer(buffer: &Buffer) {
for _ in 0..50 {
let val = buffer.remove();
println!("c: {}", val);
}
}
fn main() {
let buffer = Arc::new(Buffer::default());
let b = buffer.clone();
let p = thread::spawn(move || {
producer(&b);
});
let b = buffer.clone();
let c = thread::spawn(move || {
consumer(&b);
});
c.join().expect("Consumer had an error");
p.join().expect("Producer had an error");
}
If you wanted to have a bit more performance (benchmark to see if it's worth it), you could have Condvar
s for the "empty" and "full" conditions separately:
#[derive(Debug, Default)]
struct Buffer {
data: Mutex<Option<i32>>,
is_empty: Condvar,
is_full: Condvar,
}
impl Buffer {
fn insert(&self, val: i32) {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_some() {
lock = self.is_empty.wait(lock).expect("Can't wait");
}
*lock = Some(val);
self.is_full.notify_one();
}
fn remove(&self) -> i32 {
let mut lock = self.data.lock().expect("Can't lock");
while lock.is_none() {
lock = self.is_full.wait(lock).expect("Can't wait");
}
let val = lock.take().unwrap();
self.is_empty.notify_one();
val
}
}
Upvotes: 6