Integrating redis with my Scala application using Akka but for some reason it does not receive any messages. I can confirm that redis does have a ton of traffic on its side by opening the redis-cli on the command line.
After a pSubscribe it receives: subscribed to * and count = 1
My guess is that it might be related to the way Akka is set up to receive callbacks. I had to strip out Scala Actors in the scala-redis lib and replace them with Akka actors due to some conflicts.
Here's the code:
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive: Receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
case pSubscribe(channels) =>
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case pSubscribeAll(channels) =>"Subscribing to all channels")
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case Register(cb) =>"Callback is registered")
callback = cb
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
case UnsubscribeAll =>
class RelaySub extends Actor {
// important config values
val system = ActorSystem("pubsub")
val conf = play.api.Play.current.configuration
val relayPubHost = conf.getString("").get
val relayPubPort = conf.getInt("relays.redis.port").get
val rs = new RedisClient(relayPubHost, relayPubPort)
val s = system.actorOf(Props(new Subscriber(rs)))
s ! Register(callback)
s ! pSubscribeAll(Array("*"))"Engine Relay Subscriber has started up")
def receive: Receive = {
case Register(callback) =>
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) =>"subscribed to " + channel + " and count = " + no)
case U(channel, no) =>"unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>"unsubscribe all ... no handler yet ;)")
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>"subscribe to ... no handler yet ;)")
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>"unsubscribe from ... no handler yet ;)")
// other message receive
case x =>"Engine: received redis message")
val channelVars = channel.split(".").toArray[String]
channelVars(1) match {
case "relay" =>
EngineSyncLocal.constructRelay(channel, msg)
case _ =>
Logger.error("Engine: received unknown redis message")
case x => throw new RuntimeException("unhandled message: " + x)
case Some(Some("pmessage")::Some(pattern)::Some(channel):: Some(message)::Nil)=>
fn(M(channel, message))
asList match is missing a case
I found the problem. It appears to be a bug in the scala-redis client.
I added some logging in the consumer class and began receiving Engine: weird message
errors which means that it doesn't recognize the incoming traffic. I'll contact the author and put in a pull request.
The code:
class Consumer(fn: PubSubMessage => Any) extends Runnable {
def start () {
val myThread = new Thread(this) ;
myThread.start() ;
def run {
whileTrue {
asList match {
case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>"Engine: redis traffic")
msgType match {
case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
case "unsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
case "punsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
case "unsubscribe" | "punsubscribe" =>
fn(U(channel, data.toInt))
case "message" | "pmessage" =>
fn(M(channel, data))
case x => throw new RuntimeException("unhandled message: " + x)
case _ => Logger.error("Engine: weird redis message")
