Reputation: 4743
I'm trying to create a Source that provides OAuth2 tokens and that also takes care of refreshing expired tokens. Currently my code looks a bit like this
case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
def expired = Instant.now().isAfter(expires)
}
Source
.repeat()
.mapAsync(1){ _ =>
println(" -> token req")
// this fakes an async token request to the token service
Future{
Thread.sleep(500)
println(" <- token resp")
Token()
}
}
.mapAsync(1){ token =>
println(" -> req with token auth")
if(token.expired){
println("!!! Received expired token")
}
// this is the actual call that needs the token
println("making call")
Future{
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
.take(2)
.runWith(Sink.ignore)
.recover{case e => ()}
.flatMap{ _ =>
system.terminate()
}
Output of this code looks like this
root -> token req
root <- token resp
root -> token req
root -> req with token auth
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0
Clearly this mapAsync(1) is producing demand when not expected (prefetching?)
There are 2 issues:
So how do I create a true pull stream that behaves like this function?
def tokenSource: () => Future[Token]
Upvotes: 2
Views: 923
Reputation: 17933
If you are purposefully trying to avoid pre-fetching and queuing then I think scala.collection.immutable.Stream
, or Iterator, is a better solution than akka Stream.
Below is an example implementation that avoids the pitfalls you enumerated in your question. (Note: I used an ActorSystem
to create the ExecutionContext, via dispatcher
, to prevent the application from exiting before the sleep
invocations have time to complete. I'm taking advantage of the fact that an ActorSystem does not shut down just because the main function reaches the end of the expression definition.)
import scala.collection.immutable.Stream
import scala.concurrent.Future
object ScalaStreamTest extends App {
case class Token(expires: Long = System.currentTimeMillis() + 100){
def expired = System.currentTimeMillis() > expires
}
val actorSystem = akka.actor.ActorSystem()
import actorSystem.dispatcher
def createToken = Future {
Thread.sleep(500)
println(" <- token resp")
Token()
}
def checkExpiration(token : Future[Token]) = token map { t =>
println(" -> req with token auth")
if(t.expired){println("!!! Received expired token")}
t
}
def makeCall(token : Future[Token]) = token flatMap { t =>
println("making call")
Future {
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
val stream = Stream.continually(createToken)
.map(checkExpiration)
.map(makeCall)
.take(2)
.force
}//end object ScalaStreamTest
A force
call is necessary because a Stream is lazy and therefore all method calls before the force (namely: continually, map, & take) are also lazy. No computation would occur on the lazy Stream unless a reducer is called or the Stream is explicitly told to via force.
Upvotes: 3
Reputation: 26579
Akka Streams always prefetch to keep the pipeline saturated.
To get what you want I suggest that you create a Source[Token] that emits new Tokens when the old one expires rather than on request. Then you zip your Source of data with the source of Tokens and use the result of that.
Upvotes: 0