Reputation: 61538
In a simple test, I am expecting the flow to generate and print numbers for one second. I want to test stream operations dealing with backpressure and need a Source
which does not honor backpressure.
... with FreeSpec ... {
implicit val system = ActorSystem(this.getClass.getSimpleName)
private val matSettings: ActorMaterializerSettings =
ActorMaterializerSettings(system).withDebugLogging(true).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings.withInputBuffer(1, 1))
"must print numbers for a second" in {
val source: Source[Double, ActorRef] =
Source.actorRef(100, OverflowStrategy.fail).map(_ => Random.nextDouble())
val sink: Sink[Double, Future[Done]] = Sink.foreach(println)
val actorRef: ActorRef = Flow[Double].to(sink).runWith(source)
system.scheduler.schedule(0.micro, 1.milli, actorRef, "tick")(system.dispatcher)
Thread.sleep(1000)
println("done")
}
However, the actor seems to stop immediately after the flow has been materialized, not a single message gets delivered and only two are sent. Where am I misunderstanding what is happening here and how do I get the expected result? The log:
08:24:07.077 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858]
08:24:07.078 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] to channel class akka.actor.UnhandledMessage
08:24:07.079 DEBUG akka.event.EventStream - Default Loggers started
08:24:07.079 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - started (akka.event.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3@26bf15e8)
08:24:07.079 DEBUG akka.event.EventStream - unsubscribing StandardOutLogger from all channels
08:24:07.080 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689]
08:24:07.080 DEBUG akka.event.EventStream - subscribing Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] to channel class akka.actor.DeadLetter
08:24:07.080 DEBUG akka.event.DeadLetterListener - started (akka.event.DeadLetterListener@691f7305)
08:24:07.081 DEBUG a.a.LocalActorRefProvider$SystemGuardian - now supervising Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.081 DEBUG akka.event.EventStreamUnsubscriber - registering unsubscriber with akka.event.EventStream@a27e5b9
08:24:07.081 DEBUG akka.event.EventStream - initialized unsubscriber to: Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434], registering 3 initial subscribers with it
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - started (akka.event.EventStreamUnsubscriber@75843fb6)
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/log1-Slf4jLogger#28712451] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/UnhandledMessageForwarder#1836419858] in order to unsubscribe from EventStream when it terminates
08:24:07.082 DEBUG akka.event.slf4j.Slf4jLogger - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG a.e.LoggingBus$$anonfun$startDefaultLoggers$2$$anon$3 - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.083 DEBUG akka.event.EventStreamUnsubscriber - watching Actor[akka://BuffersAndTicksSpec/system/deadLetterListener#-876496689] in order to unsubscribe from EventStream when it terminates
08:24:07.083 DEBUG akka.event.DeadLetterListener - now watched by Actor[akka://BuffersAndTicksSpec/system/eventStreamUnsubscriber-1#722751434]
08:24:07.164 DEBUG a.a.LocalActorRefProvider$Guardian - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0#-544371192]
08:24:07.164 DEBUG akka.stream.impl.StreamSupervisor - started (akka.stream.impl.StreamSupervisor@6fd27b4e)
08:24:07.173 WARN a.stream.impl.ActorMaterializerImpl - Fuzzing mode is enabled on this system. If you see this warning on your production system then set akka.stream.materializer.debug.fuzzing-mode to off.
08:24:07.270 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-0-unknown-operation#478879902]
08:24:07.276 DEBUG a.s.i.fusing.ActorGraphInterpreter - started (akka.stream.impl.fusing.ActorGraphInterpreter@42111ea7)
08:24:07.276 DEBUG akka.stream.impl.StreamSupervisor - now supervising Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529]
08:24:07.283 DEBUG akka.stream.impl.ActorRefSourceActor - started (akka.stream.impl.ActorRefSourceActor@6dbd251e)
08:24:07.289 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
08:24:07.290 DEBUG akka.stream.impl.ActorRefSourceActor - stopped
08:24:07.291 DEBUG a.s.i.fusing.ActorGraphInterpreter - stopped
08:24:07.297 INFO akka.actor.RepointableActorRef - Message [java.lang.String] from Actor[akka://BuffersAndTicksSpec/deadLetters] to Actor[akka://BuffersAndTicksSpec/user/StreamSupervisor-0/flow-0-1-actorRefSource#1199332529] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
done
Upvotes: 1
Views: 434
Reputation: 9023
The problem here is with the type of your Source.actorRef
. Even though an actor can receive Any
type of message, when you wrap it in a Source
you need to give it a type (to fulfil Akka Streams strong typing).
Example:
val source: Source[Int, ActorRef] = Source.actorRef[Int](100, OverflowStrategy.fail)
What happens under the hood is that your Source
will try to to cast every incoming message to Int
.
In your case the Source.actorRef
is not explicitly typed, hence Nothing
is inferred by the compiler. (This is masked by the fact that you are concatenating the map
stage, where everything becomes a Double
). All you incoming "tick" messages are cast to Nothing
, causing ClassCastException
.
The solution is to type your Source.actorRef
stage
val source: Source[Double, ActorRef] =
Source.actorRef[String](100, OverflowStrategy.fail).map(_ => Random.nextDouble())
Upvotes: 4