Reputation: 337
I am working with a Scala 2.13 stack with the following technologies:
An Akka-stream job reads event from Kafka, asks an actor to compute something, and based on the given response, produces new events back to Kafka.
The issue is the messages sent using the ask pattern seem to be consumed by the QuestionActor
(below) only when at least two messages are gathered by its mailbox and only one per message received.
The weird behavior is:
t1
ref ? Question("tr1", 1, None, actorRef)
> AskTimeoutException(tr1)
t2
ref ? Question("tr2", 1, None, actorRef)
> [INFO] - Question request for tr1-1. Processing.
> AskTimeoutException(tr2)
t3
ref ? Question("tr3", 1, None, actorRef)
> [INFO] - Question request for tr2-1. Processing.
> AskTimeoutException(tr3)
I'm trying then to understand why I observe this behavior and what I'm doing wrong.
The akka-stream Kafka pipeline is:
Consumer
.plainSource(consumerSettings, subscription)
.map(DeserializeEvents.fromService)
.filter(_.eventType == classOf[Item].getName)
.via(askFlowExplicit)
.withAttributes(ActorAttributes.supervisionStrategy(decider()))
.map(
response =>
new ProducerRecord[String, OutputItem](
topics,
OutputItem(response.getClass.getName, response)
)
)
.log("Kafka Pipeline")
.runWith(Producer.plainSink(producerSettings))
The decider is a supervision strategy, that resumes the job on Serialisation
and Timeout
exceptions; askFlowExplicit
declares an ask request to an external actor and - hereby - I bumped with my issue.
val askFlowExplicit =
ActorFlow.ask[OutputItem, Question, Answer](askTarget) {
case (envelope, replyTo) =>
val item = Serdes.deserialize[Item](envelope.payload)
Question(item.trID, item.id, item.user, replyTo)
}
The pipeline starts up on Play! application bootstrap
@Singleton
class ApplicationStart @Inject()(
configuration: Configuration,
questionActor: ActorRef[QuestionActor.Question]
) {
private implicit val logger = Logger.apply(getClass)
implicit val mat = context
AlpakkaPipeline.run(configuration, questionActor)
}
The actor is a simple typed actor belonging to the same actor system and - right now - it is only forwarding the request coming from the stream towards another service.
class QuestionActor(
configuration: Configuration,
context: ActorContext[Question],
itemService: ItemService
) extends AbstractBehavior[Question](context) {
import QuestionActor._
implicit val ec: ExecutionContextExecutor = context.executionContext
private implicit val timeout: Timeout = ...
override def onMessage(msg: Question): Behavior[Question] = Behaviors.receive[Question] {
case (context, Question(trID, id, user, sender)) =>
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
object QuestionActor {
final case class Question(
trID: String,
id: Int,
user: Option[UUID],
replyTo: ActorRef[Answer]
)
def apply(itemService: ItemService, configuration: Configuration): Behavior[Question] =
Behaviors.setup { context =>
context.setLoggerName(classOf[QuestionActor])
implicit val log: Logger = context.log
new QuestionActor(configuration, context)
}
}
It is built using runtime DI and Play!
class BootstrapModule(environment: Environment, configuration: Configuration)
extends AbstractModule
with AkkaGuiceSupport {
override def configure(): Unit = {
bind(new TypeLiteral[ActorRef[CloneWithSender]]() {})
.toProvider(classOf[QuestionActorProvider])
.asEagerSingleton()
bind(classOf[ApplicationStart]).asEagerSingleton()
}
}
private class Question @Inject()(
actorSystem: ActorSystem,
itemService: ItemService,
configuration: Configuration
) extends Provider[ActorRef[Question]] {
def get(): ActorRef[Question] = {
val behavior = QuestionActor(itemService, configuration)
actorSystem.spawn(behavior, "question-actor")
}
}
What I tried
QuestionActor
QuestionActor
QuestionActor
What I didn't
It looks to me as a threading problem right now, but I don't know where to go from here. Any help is really appreciated. Thank you in advance.
Upvotes: 1
Views: 405
Reputation: 7275
The problem is that you're combining AbstractBehavior
which provides onMessage
and there you define a new Behaviors.receive[Question]
behaviour. You have to use either one or the other.
Remove Behaviors.receive
as following
override def onMessage(msg: Question): Behavior[Question] = {
log.info(s"Question request for ${msg.trID}-${msg.id}. Processing.")
itemService
.action(id, user)
.onComplete {
case Success(result) if result.isEmpty =>
log.info("Action executed")
msg.replyTo ! NothingHappened(trID, id)
case Failure(e) =>
log.error("Action failed.", e)
msg.replyTo ! FailedAction(trID, id, user, e.getMessage)
}
Behaviors.same
}
}
AbstractBehavior.onMessage
is implementation of a behaviour. So, you receive a message via method argument, you're supposed to process it and return back a new Behaviour
, Behaviours.same
in your case.
But instead of processing the message, you create a new Behaviour
with Behaviors.receive
and register the callback of the Future to the original first message. Thus you see the log statement when second message arrives, which triggers the new behaviour.
If you want to use FP style definitions, you have to stick to Behaviors.xxx
helper methods only. If you choose OOP style, then you extend AbstractBehavior
. But you shouldn't do both.
Upvotes: 3