Shakun
Shakun

Reputation: 59

GoLang sequential goroutines

I am new to golang and have a use case where operations on a value of a type have to run in a sequential manner where as operation on value of other type can be run concurrently.

  1. Imagine data is coming from a streaming connection (In-order)
    key_name_1, value_1 
    key_name_2, value_2
    key_name_1, value_1
    
  2. Now, key_name_1, and key_name_2 can be operated by goroutine concurrently.
  3. But as next streamed value (3rd row) is key_name_1 again, so this operation should only be processed by goroutine if the earlier operation (1st row) has finished otherwise it should wait for the 1st operation to finish before it can apply the operation. For the sake of discussion we can assume that operation is simply adding the new value to previous value.

What would be the right way to achieve this in golang with highest possible performance ?


The exact use case is database changes are streamed on a queue, now if a value is getting changed its important that onto another database that operation is applied on the same sequence otherwise consistency will get impacted. Conflicts are rare, but can happen.

Upvotes: 2

Views: 745

Answers (1)

rustyx
rustyx

Reputation: 85276

As a simple solution for mutual exclusivity for a given key you can just use a locked map of ref-counted locks. It's not the most optimal for high loads, but might just suffice in your case.

type processLock struct {
    mtx      sync.Mutex
    refcount int
}

type locker struct {
    mtx   sync.Mutex
    locks map[string]*processLock
}

func (l *locker) acquire(key string) {
    l.mtx.Lock()
    lk, found := l.locks[key]
    if !found {
        lk = &processLock{}
        l.locks[key] = lk
    }
    lk.refcount++
    l.mtx.Unlock()
    lk.mtx.Lock()
}

func (l *locker) release(key string) {
    l.mtx.Lock()
    lk := l.locks[key]
    lk.refcount--
    if lk.refcount == 0 {
        delete(l.locks, key)
    }
    l.mtx.Unlock()
    lk.mtx.Unlock()
}

Just call acquire(key) before processing a key and release(key) when done with it.

Live demo.

Warning! The code above guarantees exclusivity, but not sequence. To sequentialize the unlocking you need a FIFO mutex.

Upvotes: 1

Related Questions