Reputation: 13320
Integrating redis with my Scala application using Akka but for some reason it does not receive any messages. I can confirm that redis does have a ton of traffic on its side by opening the redis-cli on the command line.
After a pSubscribe it receives: subscribed to * and count = 1
My guess is that it might be related to the way Akka is set up to receive callbacks. I had to strip out Scala Actors in the scala-redis lib and replace them with Akka actors due to some conflicts.
Here's the code:
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
def receive: Receive = {
case Subscribe(channels) =>
client.subscribe(channels.head, channels.tail: _*)(callback)
case pSubscribe(channels) =>
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case pSubscribeAll(channels) =>
Logger.info("Subscribing to all channels")
client.pSubscribe(channels.head, channels.tail: _*)(callback)
case Register(cb) =>
Logger.info("Callback is registered")
callback = cb
case Unsubscribe(channels) =>
client.unsubscribe(channels.head, channels.tail: _*)
case UnsubscribeAll =>
client.unsubscribe
}
}
class RelaySub extends Actor {
// important config values
val system = ActorSystem("pubsub")
val conf = play.api.Play.current.configuration
val relayPubHost = conf.getString("relays.redis.host").get
val relayPubPort = conf.getInt("relays.redis.port").get
val rs = new RedisClient(relayPubHost, relayPubPort)
val s = system.actorOf(Props(new Subscriber(rs)))
s ! Register(callback)
s ! pSubscribeAll(Array("*"))
Logger.info("Engine Relay Subscriber has started up")
def receive: Receive = {
case Register(callback) =>
}
def callback(pubsub: PubSubMessage) = pubsub match {
case S(channel, no) => Logger.info("subscribed to " + channel + " and count = " + no)
case U(channel, no) => Logger.info("unsubscribed from " + channel + " and count = " + no)
case M(channel, msg) =>
msg match {
// exit will unsubscribe from all channels and stop subscription service
case "exit" =>
Logger.info("unsubscribe all ... no handler yet ;)")
// message "+x" will subscribe to channel x
case x if x startsWith "+" =>
Logger.info("subscribe to ... no handler yet ;)")
// message "-x" will unsubscribe from channel x
case x if x startsWith "-" =>
Logger.info("unsubscribe from ... no handler yet ;)")
// other message receive
case x =>
Logger.info("Engine: received redis message")
val channelVars = channel.split(".").toArray[String]
if(channelVars(0)!=Engine.instanceID)
channelVars(1) match {
case "relay" =>
EngineSyncLocal.constructRelay(channel, msg)
case _ =>
Logger.error("Engine: received unknown redis message")
}
}
}
}
Thanks for your help!
Upvotes: 2
Views: 493
Reputation: 369
case x => throw new RuntimeException("unhandled message: " + x)
}
case Some(Some("pmessage")::Some(pattern)::Some(channel):: Some(message)::Nil)=>
fn(M(channel, message))
asList match is missing a case
Upvotes: 0
Reputation: 13320
I found the problem. It appears to be a bug in the scala-redis client.
I added some logging in the consumer class and began receiving Engine: weird message
errors which means that it doesn't recognize the incoming traffic. I'll contact the author and put in a pull request.
The code:
class Consumer(fn: PubSubMessage => Any) extends Runnable {
def start () {
val myThread = new Thread(this) ;
myThread.start() ;
}
def run {
whileTrue {
asList match {
case Some(Some(msgType) :: Some(channel) :: Some(data) :: Nil) =>
Logger.info("Engine: redis traffic")
msgType match {
case "subscribe" | "psubscribe" => fn(S(channel, data.toInt))
case "unsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "punsubscribe" if (data.toInt == 0) =>
fn(U(channel, data.toInt))
break
case "unsubscribe" | "punsubscribe" =>
fn(U(channel, data.toInt))
case "message" | "pmessage" =>
fn(M(channel, data))
case x => throw new RuntimeException("unhandled message: " + x)
}
case _ => Logger.error("Engine: weird redis message")
}
}
}
}
Upvotes: 2