Alex
Alex

Reputation: 6099

Some questions about Redigo and concurrency

I have read through the whole Redigo documentation which can be found here. https://godoc.org/github.com/garyburd/redigo/redis#pkg-variables

Here the documentation clearly states that connections do not support concurrent calls to Send(), Flush() or Receive() methods.

Connections do not support concurrent calls to the write methods (Send, Flush) or concurrent calls to the read method (Receive). Connections do allow a concurrent reader and writer.

And then it states that since the Do method can be a combination of Send(), Flush() and Receive(), we can't use Do() concurrently (with) the other methods.

Because the Do method combines the functionality of Send, Flush and Receive, the Do method cannot be called concurrently with the other methods.

Does this mean that we can use Do() concurrently alone using a single connection stored in a global variable as long as we don't mix it with the other methods?

For example like this:

var (

    // Redis Conn.
    redisConn redis.Conn

    // Redis PubSubConn wraps a Conn with convenience methods for subscribers.
    redisPsc redis.PubSubConn
)

func redisInit() {

    c, err := redis.Dial(config.RedisProtocol, config.RedisAddress)
    if err != nil {
        log.Fatal(err)
    }
    c.Do("AUTH", config.RedisPass)
    redisConn = c

    c, err = redis.Dial(config.RedisProtocol, config.RedisAddress)
    if err != nil {
        log.Fatal(err)
    }
    c.Do("AUTH", config.RedisPass)
    redisPsc = redis.PubSubConn{c}

    for {
        switch v := redisPsc.Receive().(type) {
        case redis.Message:
            // fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
            socketHub.broadcast <- v.Data
        case redis.Subscription:
            // fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
        case error:
            log.Println(v)
        }
    }

}

And then calling the Do() method inside some go routine like this:

if _, err = redisConn.Do("PUBLISH", fmt.Sprintf("user:%d", fromId), message); err != nil {
    log.Println(err)
}
if _, err = redisConn.Do("PUBLISH", fmt.Sprintf("user:%d", toId), message); err != nil {
    log.Println(err)
}

And then later the document says that for full concurrent access to Redis, we need to create a pool and get connections from the pool and release them when we are done with it.

Does this mean that I can use, Send(), Flush() and Receive() as I want, as long as I get a connection from the pool? So in other words every time I need to do something in a go routine I have to get a new connection from the pool instead of reusing a global connection? And does this mean that I can use the Do() method with for example Send() as long as I get a new connection from the pool?

So to sum up:

1) Can I use the Do() method concurrently as long as I do not use it with the Send, Flush and Receive methods?

2) Can I use everything as I want as long as I get a new connection from the pool and release it when I'm done?

3) If (1) is true, does this affect performance? Is it better to use a global connection concurrently with only using the Do() method as in the provided example by me, and not mixing things up with Send, Flush and Receive?

Upvotes: 3

Views: 2939

Answers (1)

Thundercat
Thundercat

Reputation: 120941

You can have one concurrent writer and one concurrent reader. Because Do combines read and write operations, you can have one current current call to Do. To put this another way, you cannot call Do concurrently. You cannot store a connection in a global variable and call Do without protecting the connection with a mutex or using some other mechanism to ensure that there is no more than one concurrent caller to Do.

Pools support concurrent access. The connections returned by the pool Get method follow the rules for concurrency as described above. To get full concurrent access to the database, the application should within a single goroutine do the following: Get a connection from the pool; execute Redis commands on the connection; Close the connection to return the underlying resources to the pool.

Replace redisConn redis.Conn with a pool. Initialize the pool at app startup:

 var redisPool *redis.Pool

 ...

redisPool = &redis.Pool{
    MaxIdle: 3,  // adjust to your needs
    IdleTimeout: 240 * time.Second,  // adjust to your needs
    Dial: func () (redis.Conn, error) {
        c, err := redis.Dial(config.RedisProtocol, config.RedisAddress)
        if err != nil {
            return nil, err
        }
        if _, err := c.Do("AUTH", config.RedisPass); err != nil {
            c.Close()
            return nil, err
        }
        return c, err
    },
}

Use the pool to publish to the channels:

 c := redisPool.Get()
 if _, err = c.Do("PUBLISH", fmt.Sprintf("user:%d", fromId), message); err != nil {
    log.Println(err)
 }
 if _, err = c.Do("PUBLISH", fmt.Sprintf("user:%d", toId), message); err != nil {
    log.Println(err)
 }
 c.Close()

Do not initialize the pool in redisInit(). There's no guarantee that redisInit() will execute before other code in the application uses the pool.

Also add a call to Subscribe or PSubscribe.

Upvotes: 4

Related Questions