Reputation: 1411
I'm implementing a kind of write/store buffer in a Redis-backed library to squash multiple hincrby
calls into a single call. The buffer needs to be fully atomic and work across multiple threads.
I'm quite new to dealing with thread-safety, hence; Are there any existing libraries or standardized ways to implement a global Hash-based buffer/queue that works fine in threaded environments?
As an example, the buffer hash would work something like this pseudo code:
buffer #=> { :ident1 => { :value_a => 1, :value_b => 4 },
# :ident2 => { :value_a => 2, :value_b => 3 } }
buffer[:ident1][:value_a] #=> 1
# saving merges and increments {:value_a => 2} into buffer[:ident1]
save(:ident1, {:value_a => 2})
buffer[:ident1][:value_a] #=> 3
The idea is that after X number of save
calls the buffer is flushed by calling save
with each item from the buffer.
Upvotes: 2
Views: 954
Reputation: 303261
In general, the way that you provide access to a global value in a thread-safe manner is to use the built-in Mutex class:
$buffer = {}
$bufflock = Mutex.new
threads = (0..2).map do |i|
Thread.new do
puts "Starting Thread #{i}"
3.times do
puts "Thread #{i} got: #{$buffer[:foo].inspect}"
$bufflock.synchronize{ $buffer[:foo] = ($buffer[:foo] || 1) * (i+1) }
sleep rand
end
puts "Ending Thread #{i}"
end
end
threads.each{ |t| t.join } # Wait for all threads to complete
#=> Starting Thread 0
#=> Thread 0 got: nil
#=> Starting Thread 1
#=> Thread 1 got: 1
#=> Starting Thread 2
#=> Thread 2 got: 2
#=> Thread 1 got: 6
#=> Thread 1 got: 12
#=> Ending Thread 1
#=> Thread 0 got: 24
#=> Thread 2 got: 24
#=> Thread 0 got: 72
#=> Thread 2 got: 72
#=> Ending Thread 0
#=> Ending Thread 2
Code inside a Mutex#synchronize
block is atomic per thread; one thread cannot go into $bufflock
until the previous thread is done with the block.
See also: Pure-Ruby concurrent Hash
Upvotes: 5