user267817
user267817

Reputation:

How to push data to enumerator

The code below is a simple example of actor that would like to communicate with a remote server using an enumerator. It should be possible to push new data to the enumerator. However, I'm not sure how to do so. I found a solution in this question, but the the Enumerator.imperative is deprecated and according to the Play! docs it seems that it was replace by Concurrent.unicast, which doesn't have the push method.

// WorkerActor
val stdin = Concurrent.unicast[Array[Byte]](onStart = channel => {
  channel.push("uname -a\n".toCharArray.map(_.toByte)) // First message
}) >>> Enumerator.eof

attachStream(stdin)

def receive = LoggingReceive {
  case message: Array[Byte] =>
    // TODO: push the message to the stream
    // stdin push message ?
    ...
}

Thank you for any help you can provide.

Upvotes: 0

Views: 66

Answers (1)

johanandren
johanandren

Reputation: 11479

You would need to capture the channel, you could for example do something like to do this inside of an actor:

// WorkerActor
case class GotChannel(channel: Channel[Array[Byte]])
case object ChannelClosed
case class ChannelError(msg: String)
val stdin = Concurrent.unicast[Array[Byte]](
  // you cannot change actor state from in here, it is on another thread
  onStart = channel => self ! GotChannel(channel),
  onComplete = () => self ! ChannelClosed,
  onError = (msg, _) => self ! ChannelError(msg)
) >>> Enumerator.eof

attachStream(stdin)

def receive = {
  case GotChannel(channel) =>
    channel.push("uname -a\n".toCharArray.map(_.toByte))
    context.become(active(channel))
}

def active(channel: Channel[Array[Byte]]): Actor.Receive = LoggingReceive {
  case message: Array[Byte] =>
    // push the message to the stream
    channel.push(message)
    ...
  // handle ChannelClosed and ChannelError here
}

Upvotes: 2

Related Questions