djechlin
djechlin

Reputation: 60828

Redis unsubscribe

Redis supports PUBSUB. Subscribing is easy enough:

redis 127.0.0.1:6379> subscribe foo
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "foo"
3) (integer) 1

However it seems impossible to unsubscribe, because while subscribing, the server does not accept commands. e.g. in the redis-cli client that ships with redis, control is not returned to the client, so if I type unsubscribe it doesn't go anywhere.

This seems like either a blatant error in the documentation, the function, or a PEBKAC issue. What gives?

Version:

$ ./redis-server --version
Redis server v=2.6.14 sha=00000000:0 malloc=libc bits=64

Upvotes: 5

Views: 7833

Answers (3)

user1611552
user1611552

Reputation: 621

Actually, PSUBSCRIBE will also block all subsequent commands as same as SUBSCRIBE, thus you can't ship any orders to the server but cast your eager gaze back to wait your interested channel for the incoming messages. Well, This ridiculous behavior makes my head spin. However, if you try to interact with redis by means of telnet (e.g telnet localhost 6379) instead of redis-cli prompt. Everything will be OK. Enjoy it.

Upvotes: 3

Nik
Nik

Reputation: 1334

An example of publishing subscribing and unsubscribing in c++.




#include <signal.h>
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <hiredis/hiredis.h>
#include <hiredis/async.h>
#include <hiredis/adapters/libevent.h>
#include <boost/thread/thread.hpp>

using namespace std;

struct event_base* base;
std::string CHANNEL("");


void subCallback(redisAsyncContext *c, void *r, void *privdata) ;

void unSubscribe(redisAsyncContext* _redisContext){

  std::string input;

  while(1)
  {
    cin>>input;
    if(input.compare("unsub") == 0)
      break;
    sleep(5);
  }

  std::string command("unsubscribe ");
  command.append(CHANNEL);

  cout<<
    redisAsyncCommand(_redisContext, 
        subCallback, 
        (char*)"unsub", command.c_str())<<endl; 
}

void subCallback(redisAsyncContext *c, void *r, void *privdata) {

  redisReply *reply = (redisReply*)r;
  if (reply == NULL){
    cout<<"Response not recev"<<endl; 
    return;
  }
  if(reply->type == REDIS_REPLY_ARRAY & reply->elements == 3)
  {
    if(strcmp( reply->element[0]->str,"subscribe") != 0)
    {
      cout<<"Reply for:  "<<reply->element[0]->str<<endl;
      if(strcmp( reply->element[0]->str,"unsubscribe") == 0)
      {
        exit(1);
      }

      cout<<"Message received -> "<<
        reply->element[2]->str<<"( on channel : "<<reply->element[1]->str<<")"<<endl;
    }
  }
}


void pubCallback(redisAsyncContext *c, void *r, void *privdata) {

  redisReply *reply = (redisReply*)r;
  if (reply == NULL){
    cout<<"Response not recev"<<endl; 
    return;
  }
  cout<<"message published"<<endl;
  redisAsyncDisconnect(c);
}

void connectCallback(const redisAsyncContext *c, int status) {
  if (status != REDIS_OK) {
    cout<<"Error in connect: %s\n"<< c->errstr<<endl;
    return;
  }
  cout<<"Connected to redis server..."<<endl;
}

void disconnectCallback(const redisAsyncContext *c, int status) {
  if (status != REDIS_OK) {
    cout<<"Error in disconnect: %s\n"<< c->errstr<<endl;
    return;
  }
  cout<<"Disconnected...\n"<<endl;
}

int main(int argv, char** args)
{
  string processName(args[1]);

  signal(SIGPIPE, SIG_IGN);
//  struct event_base*
    base = event_base_new();

  redisAsyncContext* 
    _redisContext = redisAsyncConnect("10.0.0.30", 6379);

  if (_redisContext->err) {
    /* Let context leak for now... */
    cout<<"Error: "<< _redisContext->errstr<<endl;
    return 1;
  }

  redisLibeventAttach(_redisContext,base);
  //redisAsyncSetConnectCallback(_redisContext,connectCallback);
  //redisAsyncSetDisconnectCallback(_redisContext,disconnectCallback);

  if(processName.compare("pub") == 0)
  {
    string command ("publish ");
    command.append(args[2]);
    command.append (" ");
    command.append(args[3]);

    cout<<
      redisAsyncCommand(_redisContext, 
          pubCallback, 
          (char*)"pub", command.c_str())<<endl; 

  }
  else if(processName.compare("sub") == 0)
  {
    boost::thread unsubscribe(&unSubscribe, _redisContext);

    string command ("subscribe ");
    command.append(args[2]);

    CHANNEL.append(args[2]);

    cout<<
      redisAsyncCommand(_redisContext, 
          subCallback, 
          (char*)"sub", command.c_str())<<endl; 

  }
  else
    cout<<"Try pub or sub"<<endl;

  event_base_dispatch(base);

  return 0;
}

Code can be run to test subscription , publish and unsubscribe:

  • subscribe: (1st exe)

    ./pubsub channel1
    
  • publish: (2nd exe)

    ./pubsub pub channel1 hi
    

    Output in 1st exe

    0

    Reply for: message

    Message received -> hi( on channel : channel1)

  • Now unsubscribe 1st exe, type unsub in 1st exe console. Final output:

    0 Reply for: message

    Message received -> hi( on channel : channel1)

    unsub

    0

    Reply for: unsubscribe

Upvotes: 0

Homer6
Homer6

Reputation: 15159

By client, I believe they mean the list of clients here:

http://redis.io/clients

As someone who has consumed the hiredis client, I presume that this recommendation:

Once the client enters the subscribed state it is not supposed to issue any other commands, except for additional SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE and PUNSUBSCRIBE commands.

on this page: http://redis.io/commands/subscribe applies only to those clients.

The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.

Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).

If you were to use a different client (or more specifically, if you were implementing one), my guess is that you'd have to observe that convention so that it would be in a subscribed state (though the client wouldn't necessarily be blocking).

I think the documentation could be a little clearer to disambiguate that; however, the documentation is on the server itself and not the redis-cli application. You could, however, make the adjustment in the documentation repo and submit a pull request.

https://github.com/antirez/redis-doc/blob/master/commands/subscribe.md

Upvotes: 6

Related Questions