Reputation: 1154
Problem: I am trying to solve a problem where I need to schedule for every x minutes, I need to update the cache and concurrent gets are possible.
Solutions tried:
I actually started with using TrieMap as it provides thread safety and used scheduled thread pool for scheduling the update
import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}
import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreTrieMap extends IOApp {
def callForEvery[A](f: => Unit, d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable {
cb =>
val r = new Runnable {
override def run(): Unit = cb(Right(f))
}
val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
IO(scFut.cancel(false)).void
}
}
val map = TrieMap.empty[String, String]
override def run(args: List[String]): IO[ExitCode] = {
implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
for {
_ <- callForEvery(println(map.get("token")), 1 second)
_ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
} yield ExitCode.Success
}
}
And then created a pure cats-effect solution.
Will this below code end up in StackOverflow error?
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulate(ref, 1 minute)
r <- keepPollingUsingFiber(ref)
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
rS <- scheduleAndPopulate(r, duration)(cs)
_ <- rS.join
} yield ()).start(cs)
}
def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingFiber(r)(cs)
_ <- w.join
} yield ()).start(cs)
}
}
I'm trying to update a Ref, and use the Ref like a concurrent cache that is being updated by another fiber. And I'm triggering the fiber creation using recursion. I know fibers can be used for stacksafe operations. In this case, I'm joining on the old fiber created. So wanted to understand is the below code safe.
Third solution: Based on input from one of the answers. Rather than forking for each recursive call, fork it on the caller.
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}
import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random
object ExploreCatFiberWithIO extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
s <- scheduleAndPopulateWithIO(ref, 1 second).start
r <- keepPollingUsingIO(ref).start
_ <- s.join
_ <- r.join
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulateWithIO(r, duration)(cs)
} yield ()
}
def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
w <- keepPollingUsingIO(r)(cs)
} yield ())
}
}
Would love to know the pros and cons of the approaches discussed above.
Upvotes: 1
Views: 929
Reputation: 545
For the second approach, you can make it simpler by not forking a Fiber
in scheduleAndPopulate
and keepPollingUsingFiber
. Instead, keep the recursive call, and fork them in the caller. IO
is stack-safe, so the recursive call won't blow up the stack.
You could use start
to fork each, but it might be simpler to parTupled
them. It's a variation of parMapN
that forks each effect and gathers their results.
(Also, in your code you don't need to pass the implicit values, like cs
, explicitly, the compiler will infer them for you.)
object ExploreCatFiber extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
for {
ref <- Ref.of[IO, String]("")
_ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
} yield ExitCode.Success
}
def populate(): Future[String] = Future.successful(Random.nextString(10))
val futPop = IO.fromFuture(IO(populate()))
def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
(for {
_ <- IO(println("Scheduled For Populating Ref"))
res <- futPop
_ <- r.set(res)
_ <- IO.sleep(duration)
_ <- scheduleAndPopulate(r, duration)
} yield ()
}
def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
(for {
res <- r.get
_ <- IO(println(res))
_ <- IO.sleep(1 second)
_ <- keepPollingUsingFiber(r)
} yield ()
}
}
Upvotes: 3