Julia.T
Julia.T

Reputation: 139

process one message per user

I have a list in redis that I use as a queue. I push elements on the left and pop from the right.Requests from different users get pushed to the queue. I have a pool of goroutines that read a request off of the queue(POP) and processes them. I want to be able to only process one request per userId at a time. I have a ReadRequest() function running forever that POPs a request that has a userId. I need the request per user to be processed in the order they came in. I'm not sure how to implement this. Would I need a redis list per userId? If so how would I loop through all the lists processing the requests in them?

for i:=0; i< 5; i++{
  wg.Add(1)
  go ReadRequest(&wg)

}


func ReadRequest(){

   for{

      //redis pop request off list
       request:=MyRedisPop()
       fmt.Println(request.UserId)

      // only call Process if no other goroutine is processing a request for this user
      Process(request)



 time.sleep(100000)
     }

wg.Done()

}

Upvotes: 0

Views: 87

Answers (1)

Tarun Khandelwal
Tarun Khandelwal

Reputation: 491

Here is the pseudo-code that you could use without creating multiple Redis lists:

// maintain a global map for all users
// if you see a new user, call NewPerUser() and add it to the list
// Then, send the request to the corresponding channel for processing
var userMap map[string]PerUser 

type PerUser struct {
    chan<- redis.Request // Whatever is the request type
    semaphore *semaphore.Weighted // Semaphore to limit concurrent processing
}

func NewPerUser() *PerUser {
    ch := make(chan redis.Request)
    s := semaphore.NewWeighted(1) // One 1 concurrent request is allowed
    go func(){
        for req := range ch {
            s.Acquire(context.Background(), 1)
            defer s.Release(1)
            // Process the request here
        }
    }()
}

Please note that this is only a pseudo-code and I haven't tested whether it works.

Upvotes: 1

Related Questions