Reputation: 151
I'm using akka http and streams to fulfill API requests. When the request is invalid I want to return a 400 and if it's valid, i want to proceed with the computation and return the result afterwards. The Problem I'm facing is, that the Payload I'm receiving from the POST request is a Source and I cannot convert it into 2 Streams (one for valid and one for invalid input data) and complete the request correct.
path("alarms")(
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw Exception("Wrong input"))
))
complete(repository.create(flow).run) // <-- here I only want to pass all events that are valid. For the other events complete(HttpResponse(NotFound, entity = "Invalid input")) should be used
})
)
/// The signature of the repository.create looks like that
def create(message: Source[String, NotUsed]): RunnableGraph[Future[Done]]
Upvotes: 0
Views: 209
Reputation: 558
You may use the akka-http handleExceptions
directive, sth like this:
val exceptionHandler = ExceptionHandler {
case ex: RuntimeException =>
complete(HttpResponse(NotFound, entity = "Invalid input"))
}
path("alarms")(
handleExceptions(exceptionHandler) {
post(entity(asSourceOf[String]) { message =>
val flow = message.via(Flow[String].map((it) =>
Try(if valid(it) then it else throw new RuntimeException("Invalid input"))
))
complete(repository.create(flow).run)
})
}
)
Doc:
https://doc.akka.io/docs/akka-http/current/routing-dsl/exception-handling.html
There is also handleRejections
directive for even more control - see https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/execution-directives/handleRejections.html
Upvotes: 1