Reputation: 2150
I'm trying to interrupt an fs2 stream with SignalRef. I set up and run the stream with the following. The stream should run when switch
contains false
and should interrupt when switch
contains true
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
val program: Stream[IO, Unit] =
Stream
.repeatEval(IO{
println(java.time.LocalTime.now)
println(switch.map(_.get).unsafeRunSync.unsafeRunSync)
})
.metered(1.second)
program
.interruptWhen(Stream.repeatEval(switch.map(_.get).unsafeRunSync))
}
program.compile.drain.unsafeRunAsync(() => _)
I then attempt to interrupt the stream with
switch.map(_.set(true).unsafeRunSync)
However, the stream keeps going. In stdout I see
15:58:33.048504
false
15:58:34.048760
false
15:58:35.049063
false
15:58:36.049356
false
15:58:37.049615
false
So apparently it's not picking up the switch to true?
Upvotes: 4
Views: 2423
Reputation: 22595
There are several problems with your code.
First of all, please check the signature of switch
:
val switch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
Type of SignallingRef
is wrapped into IO
. It means that the creation of new SignallingRef
is suspended until IO
monad is evaluated (implicitly by IO
program flow or explicitly by calling unsafeRunXXX
). So more suitable name of this value would be probably createSwitch
.
When you use switch.map(_.get).unsafeRunSync
actually every time you're creating a new instance of SignallingRef
with its default value false
, so it's never get's evaluated to true.
The rule of thumb is that you should (almost) never call unsafeRunXXX
methods up until you finish assembling your IO/Stream program and then you should run that kind of method once.
The right way to do it would be to create switch
once in for-comprehension and then you could pass it to program
as an argument.
I refactored your code a little bit to do what I think you intended to do and also added some clarification comments.
import cats.effect.IO
import fs2.Stream
import fs2.concurrent.SignallingRef
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
implicit val contextShift = IO.contextShift(ExecutionContext.global)
implicit val timer = IO.timer(ExecutionContext.global)
//I changed name to createSwitch, which I think reflect reality more
val createSwitch: IO[SignallingRef[IO, Boolean]] = SignallingRef[IO, Boolean](false)
val program: Stream[IO, Unit] = {
//I pass here switch as method's param
def program(switch: SignallingRef[IO, Boolean]): Stream[IO, Unit] =
Stream
.repeatEval {
for { //I used for-comprehension to split IO into 3 statements
switchValue <- switch.get //here I get value of switch
_ <- IO(println(java.time.LocalTime.now)) //I split println into 2 separate statements
_ <- IO(println(switchValue)) //because it's not a good practive to run 2 effect in single IO
} yield ()
}
.metered(1.second)
for {
switch <- Stream.eval(createSwitch)
//here I create effect to set switch to true after 10 seconds and then use start to run it
//separate fiber in background. If I didn't do that it would just wait 10 sec and only then run program
_ <- Stream.eval(switch.set(true).delayBy(10.seconds).start)
_ <- program(switch).interruptWhen(switch)
} yield ()
}
program.compile.drain.unsafeRunSync()
Upvotes: 7
Reputation: 27535
Personally, I use my own kill switch for things like that:
final case class KillSwitch[F[_]](stream: Stream[F, Unit], switch: F[Unit])
object KillSwitch {
def apply[F[_]: Sync]: F[KillSwitch[F]] =
Ref.of(true).map(switch => KillSwitch(Stream.repeatEval(switch.get).takeWhile(identity).void, switch.set(false)))
}
I use it more or less like:
for {
KillSwitch(kfStream, switch) <- KillSwitch[F]
streamFiber <- yourStream.zip(kfStream).map(_._1).compile.drain.start
// after a while
_ <- switch // closes kfStream
result <- streamFiber.join
} yield result
(let's say it's a pseudocode to show the idea).
Upvotes: 4