Alechko
Alechko

Reputation: 1598

Redis golang client periodically discarding bad PubSub connection (EOF)

What I did:

I'm using the golang Redis library from github.com/go-redis/redis. My client listens on a PubSub channel called 'control'. Whenever a message arrives, I handle it and continue receiving the next message. I listen endlessly and the messages can come often, or sometimes not for days.

What I expect:

I expect the redis channel to stay open endlessly and receive messages as they are sent.

What I experience:

Usually it runs well for days, but every once in a while client.Receive() returns EOF error. After this error, the client no longer receives messages on that channel. Internally, the redis client prints to stdout the following message:

redis: 2019/08/29 14:18:57 pubsub.go:151: redis: discarding bad PubSub connection: EOF

Disclaimer: I am not certain that this error is what causes me to stop receiving messages, it just seems related.

Additional questions:

I'd like to understand why this happens, if this is normal and if reconnecting to the channel via client.Subscribe() whenever I encounter the behaviour is a good remedy, or should I address the root issue, whatever it may be.

The code:

Here is the entire code that handles my client (connect to redis, subscribe to channel, endlessly receive messages):

func InitAndListenAsync(log *log.Logger, sseHandler func(string, string) error) error {
    rootLogger = log.With(zap.String("component", "redis-client"))

    host := env.RedisHost
    port := env.RedisPort
    pass := env.RedisPass
    addr := fmt.Sprintf("%s:%s", host, port)
    tlsCfg := &tls.Config{}
    client = redis.NewClient(&redis.Options{
        Addr:      addr,
        Password:  pass,
        TLSConfig: tlsCfg,
    })

    if _, err := client.Ping().Result(); err != nil {
        return err
    }

    go func() {
        controlSub := client.Subscribe("control")
        defer controlSub.Close()
        for {
            in, err := controlSub.Receive()  // *** SOMETIMES RETURNS EOF ERROR ***
            if err != nil {
                rootLogger.Error("failed to get feedback", zap.Error(err))
                break
            }
            switch in.(type) {
            case *redis.Message:
                cm := comm.ControlMessageEvent{}
                payload := []byte(in.(*redis.Message).Payload)
                if err := json.Unmarshal(payload, &cm); err != nil {
                    rootLogger.Error("failed to parse control message", zap.Error(err))
                } else if err := handleIncomingEvent(&cm); err != nil {
                    rootLogger.Error("failed to handle control message", zap.Error(err))
                }

            default:
                rootLogger.Warn("Received unknown input over REDIS PubSub control channel", zap.Any("received", in))
            }
        }
    }()
    return nil
}

Upvotes: 5

Views: 10957

Answers (3)

metarose
metarose

Reputation: 301

I don't If this is right approach but when creating new Redis client setting ReadTimeout property to -1 fixed the problem for me.

redisClient := redis.NewClient(&redis.Options{
    Addr:        addr,
    Password:    redisConf.Password,
    DB:          0, // Default DB
    ReadTimeout: -1,
})

Note: I am using go-redis/v9

Upvotes: 1

Alechko
Alechko

Reputation: 1598

I solved the disconnects by ranging over the channel returned from pubsub.Channel() instead of Receive().

Here's the new code:


func listenToControlChannel(client *redis.Client) {
    pubsub := client.Subscribe("control")
    defer pubsub.Close()

    if _, err := pubsub.Receive(); err != nil {
        rootLogger.Error("failed to receive from control PubSub", zap.Error(err))
        return
    }

    controlCh := pubsub.Channel()
    fmt.Println("start listening on control PubSub")

    // Endlessly listen to control channel,
    for msg := range controlCh {
        cm := ControlMessageEvent{}
        payload := []byte(msg.Payload)
        if err := json.Unmarshal(payload, &cm); err != nil {
            fmt.Printf("failed to parse control message: %s\n", err.Error())
        } else if err := handleIncomingEvent(&cm); err != nil {
            fmt.Printf("failed to handle control message: %s\n", err.Error())
        }
    }
}

Upvotes: 2

kostix
kostix

Reputation: 55473

My take on that is that Redis may disconnect your client if it thinks the client is idling.

A way to cope with this seems to be like that:

  1. Use ReceiveTimeout rather than Receive.
  2. If the operation timed out, issue Ping and wait for reply.
  3. Rinse, repeat.

That way, you'll be sure there exists some traffic on the connection—no matter whether there is any data actually published or not.

I'd start here.

Upvotes: 0

Related Questions