crockpotveggies
crockpotveggies

Reputation: 13320

Scala-redis subscribes to * but receives zero messages

Integrating redis with my Scala application using Akka but for some reason it does not receive any messages. I can confirm that redis does have a ton of traffic on its side by opening the redis-cli on the command line.

After a pSubscribe it receives: subscribed to * and count = 1

My guess is that it might be related to the way Akka is set up to receive callbacks. I had to strip out Scala Actors in the scala-redis lib and replace them with Akka actors due to some conflicts.

Here's the code:

The Subscriber Actor

class Subscriber(client: RedisClient) extends Actor {
  var callback: PubSubMessage => Any = { m => }

  def receive: Receive = { 
    case Subscribe(channels) =>
      client.subscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribe(channels) =>
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case pSubscribeAll(channels) =>
      Logger.info("Subscribing to all channels")
      client.pSubscribe(channels.head, channels.tail: _*)(callback)

    case Register(cb) =>
      Logger.info("Callback is registered")
      callback = cb

    case Unsubscribe(channels) =>
      client.unsubscribe(channels.head, channels.tail: _*)

    case UnsubscribeAll =>
      client.unsubscribe
  }
}

Initializing the Subscriber

class RelaySub extends Actor {

  // important config values
  val system = ActorSystem("pubsub")
  val conf = play.api.Play.current.configuration
  val relayPubHost = conf.getString("relays.redis.host").get
  val relayPubPort = conf.getInt("relays.redis.port").get

  val rs = new RedisClient(relayPubHost, relayPubPort)
  val s = system.actorOf(Props(new Subscriber(rs)))
  s ! Register(callback) 
  s ! pSubscribeAll(Array("*"))
  Logger.info("Engine Relay Subscriber has started up")

  def receive: Receive = {      
    case Register(callback) =>
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
    case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
      case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
      case M(channel, msg) => 
        msg match {
          // exit will unsubscribe from all channels and stop subscription service
          case "exit" => 
            Logger.info("unsubscribe all ... no handler yet ;)")

          // message "+x" will subscribe to channel x
          case x if x startsWith "+" => 
            Logger.info("subscribe to ... no handler yet ;)")

          // message "-x" will unsubscribe from channel x
          case x if x startsWith "-" => 
            Logger.info("unsubscribe from ... no handler yet ;)")

          // other message receive
          case x => 
            Logger.info("Engine: received redis message")
            val channelVars = channel.split(".").toArray[String]
            if(channelVars(0)!=Engine.instanceID)
                channelVars(1) match {
                  case "relay" => 
                    EngineSyncLocal.constructRelay(channel, msg)
                  case _ => 
                    Logger.error("Engine: received unknown redis message")
                }
        }
  }
}

Thanks for your help!

Upvotes: 2

Views: 493

Answers (2)

Arneball
Arneball

Reputation: 369

            case x => throw new RuntimeException("unhandled message: " + x)
          }
case Some(Some("pmessage")::Some(pattern)::Some(channel):: Some(message)::Nil)=>
              fn(M(channel, message))

asList match is missing a case

Upvotes: 0

crockpotveggies
crockpotveggies

Reputation: 13320

I found the problem. It appears to be a bug in the scala-redis client.

I added some logging in the consumer class and began receiving Engine: weird message errors which means that it doesn't recognize the incoming traffic. I'll contact the author and put in a pull request.

The code:

class Consumer(fn: PubSubMessage => Any) extends Runnable {

    def start () {
      val myThread = new Thread(this) ;
      myThread.start() ;
    }

    def run {
      whileTrue {
        asList match {
          case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
            Logger.info("Engine: redis traffic")
            msgType match {
              case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
              case "unsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "punsubscribe" if (data.toInt == 0) => 
                fn(U(channel, data.toInt))
                break
              case "unsubscribe" | "punsubscribe" => 
                fn(U(channel, data.toInt))
              case "message" | "pmessage" => 
                fn(M(channel, data))
              case x => throw new RuntimeException("unhandled message: " + x)
            }
          case _ => Logger.error("Engine: weird redis message")
        }
      }
    }
  }

Upvotes: 2

Related Questions