Reputation: 2175
I have built a flow which takes a case class (Event
) and sends it to a HTTP endpoint and returns it back. It's implemented like so:
Flow[Event]
.mapAsync(16)(eventToHttpRequest)
.via(connection)
.map(handleResponse)
For reference, here is the handleResponse
method:
def handleResponse(endpoint: String)(responseTuple: (Try[HttpResponse], Event))(implicit actorSystem: ActorSystem, mat: ActorMaterializer) = {
responseTuple match {
case (Success(response), event) =>
response.status.intValue() match {
case code if code >= 500 =>
val message = s"Server side error sending event with id ${event.id} to ingestion gateway, status : ${response.status}"
LOG.error(message)
response.discardEntityBytes()
throw new UnexpectedException(message)
case code if (code >= 400) && (code < 500) =>
val message = s"Bad request sending event with id ${event.id} to ingestion gateway, status : ${response.status}"
LOG.error(message)
throw new UnexpectedException(message)
case _ =>
LOG.debug(s"Sent event with id ${event.id}, status : ${response.status}")
response.discardEntityBytes()
event
}
case (Failure(ex), justDataEvent) =>
LOG.error(s"Could not connect to $endpoint")
throw ex
}
}
I would like to monitor how long the HTTP request takes. "How long a request takes" could be thought of as:
In this case they will be very similar, as the response is small, but it would be good to know how to compute both.
Upvotes: 0
Views: 688
Reputation: 17973
For the request response cycle this can be implemented with an intermediate flow that adds a start time to the http request and event:
type EventAndTime = Tuple2[Event, Long]
val addQueryTime : Tuple2[HttpRequest, Event] => Tuple2[HttpRequest, EventAndTime] =
(tup) => (tup._1, (tup._2, java.lang.System.currentTimeMillis()))
val addQueryTimeFlow : Flow[(HttpRequest, Event), (HttpRequest, EventAndTime),_] =
Flow[(HttpRequest, Event)] map addQueryTime
Now handleRequest
will receive the Event
and the system time after going through the conn
:
Flow[Event]
.mapAsync(16)(eventToHttpRequest)
.via(addQueryTimeFlow)
.via(connection)
.map(handleResponse)
handleRequest
can just ask for the system time again and do a diff.
You can do a similar trick with response.entity
to time how long that takes:
val timeout : FiniteDuration = ???
case class EntityAndTime(strict : Strict, startTime : Long, endTime : Long)
val entity = response.entity
val entityAndTime : Future[EntityAndTime] =
Future(System.currentTimeMillis())
.flatMap { startTime =>
entity
.toStrict(timeout)
.map { strict =>
EntityAndTime(strict, startTime, System.currentTimeMillis())
}
}
Upvotes: 1