Reputation:
The code below is a simple example of actor that would like to communicate with a remote server using an enumerator. It should be possible to push new data to the enumerator. However, I'm not sure how to do so. I found a solution in this question, but the the Enumerator.imperative is deprecated and according to the Play! docs it seems that it was replace by Concurrent.unicast, which doesn't have the push method.
// WorkerActor
val stdin = Concurrent.unicast[Array[Byte]](onStart = channel => {
channel.push("uname -a\n".toCharArray.map(_.toByte)) // First message
}) >>> Enumerator.eof
attachStream(stdin)
def receive = LoggingReceive {
case message: Array[Byte] =>
// TODO: push the message to the stream
// stdin push message ?
...
}
Thank you for any help you can provide.
Upvotes: 0
Views: 66
Reputation: 11479
You would need to capture the channel, you could for example do something like to do this inside of an actor:
// WorkerActor
case class GotChannel(channel: Channel[Array[Byte]])
case object ChannelClosed
case class ChannelError(msg: String)
val stdin = Concurrent.unicast[Array[Byte]](
// you cannot change actor state from in here, it is on another thread
onStart = channel => self ! GotChannel(channel),
onComplete = () => self ! ChannelClosed,
onError = (msg, _) => self ! ChannelError(msg)
) >>> Enumerator.eof
attachStream(stdin)
def receive = {
case GotChannel(channel) =>
channel.push("uname -a\n".toCharArray.map(_.toByte))
context.become(active(channel))
}
def active(channel: Channel[Array[Byte]]): Actor.Receive = LoggingReceive {
case message: Array[Byte] =>
// push the message to the stream
channel.push(message)
...
// handle ChannelClosed and ChannelError here
}
Upvotes: 2