blue-sky
blue-sky

Reputation: 53826

How to continually call a REST service using non blocking code with Akka

I'm accessing data from a REST endpoint :

"https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"

To access the data once per second I use an infinite loop while(true) { to invoke a message send to the actor once per second which begins the process of invoking the REST request:

The actor to access the data is:

object ProductTickerRestActor {

  case class StringData(data: String)

}

class ProductTickerRestActor extends Actor {
  
  override def receive: PartialFunction[Any, Unit] = {

    case ProductTickerRestActor.StringData(data) =>
      try {
        println("in ProductTickerRestActor")
        val rData = scala.io.Source.fromURL("https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker").mkString
        println("rData : "+rData)

      }
      catch {
        case e: Exception =>
          println("Exception thrown in ProductTickerRestActor: " + e.getMessage)
      }

    case msg => println(s"I cannot understand ${msg.toString}")
  }
}

I start the application using:

object ExchangeModelDataApplication {

  def main(args: Array[String]): Unit = {

    val actorSystem = ActorSystemConfig.getActorSystem

    val priceDataActor = actorSystem.actorOf(Props[ProductTickerRestActor], "ProductTickerRestActor")
    val throttler = Throttlers.getThrottler(priceDataActor)
    while(true) {
      throttler ! ProductTickerRestActor.StringData("test")
      Thread.sleep(1000)
    }

}

Throttler:

object Throttlers {


  implicit val materializer = ActorMaterializer.create(ActorSystemConfig.getActorSystem)

  def getThrottler(priceDataActor: ActorRef) = Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
    .throttle(1, 1.second)
    .to(Sink.actorRef(priceDataActor, NotUsed))
    .run()
}

How to run the following code asynchronously instead of blocking using an infinite loop? :

throttler ! ProductTickerRestActor.StringData("test")
Thread.sleep(1000) 

Also, the throttler, in this case, maybe redundant as I'm throttling requests within the loop regardless.

Upvotes: 0

Views: 628

Answers (2)

Tim
Tim

Reputation: 27356

Sending a request every second is not a good idea. If, for some reason, the request is delayed, you are going to get a lot of requests piled up. Instead, send the next request one second after the previous requests completes.

Because this code uses a synchronous GET request, you can just send the next request one second after mkString returns.

But using synchronous requests is not a good way to consume a RESTful API in Akka. It blocks the actor receive method until the request is complete, which can eventually block the whole ActorSystem.

Instead, use Akka Http and singleRequest to perform an asynchronous request.

Http().singleRequest(HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker"))

This returns a Future. Make the new request one second after the request completes (e.g. using onComplete on the Future).

Not only is this safer and more asynchronous, it also provides much more control over the REST API call than fromUrl

Upvotes: 1

Levi Ramsey
Levi Ramsey

Reputation: 20551

I would just use Akka Streams for this along with Akka HTTP. Using Akka 2.6.x, something along these lines would be sufficient for 1 request/second

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.scaladsl._

import scala.concurrent.duration._

object HTTPRepeatedly {
  implicit val system = ActorSystem()
  import system.dispatcher

  val sourceFromHttp: Source[String, NotUsed] =
    Source.repeated("test") // Not sure what "test" is actually used for here...
      .throttle(1, 1.second)
      .map { str =>
        HttpRequest(uri = "https://api-public.sandbox.pro.coinbase.com/products/BTC-EUR/ticker")
      }.mapAsync(1) { req =>
        Http().singleRequest(req)
      }.mapAsync(1)(_.entity.toStrict(1.minute))
      .map(_.data.decodeString(java.nio.charset.StandardCharsets.UTF_8))
}

Then you could, for instance (for simplicity, put this in a main within HTTPRepeatedly so the implicits are in scope etc.)

val done: Future[Done] =
  sourceFromHttp
    .take(10) // stop after 10 requests
    .runWith(Sink.foreach { rData => println(s"rData: $rData") })

scala.concurrent.Await.result(done, 11.minute)

system.terminate()

Upvotes: 1

Related Questions