Reputation: 383
I'm a newbie trying to grasp the intuition behind fs2 Queues.
I'm trying to do a basic example for pulling data from a Stream[IO, Int]
. But the documentation for me is not enough as it directly dives into advanced stuff directly.
Here what I've done so far:
import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue
class QueueInt(q: Queue[IO, Int]) {
def startPushingtoQueue: Stream[IO, Unit] = {
Stream(1, 2, 3).covary[IO].through(q.enqueue)
q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
}
}
object testingQueues extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val stream = for {
q <- Queue.bounded(10)
b = new QueueInt(q)
_ <- b.startPushingtoQueue.drain
} yield ()
}
}
Question 1: I'm getting No implicit argument of type Concurrent[F_],
Knowing that I'm not using any concurrent effect I can not seem to figure out what a am I missing?
Question 2: What can I do to print the results.
Question 3: Can someone direct me to some resources to learn fs2
Upvotes: 2
Views: 2183
Reputation: 22595
I found several issues in your code:
q <- Queue.bounded[IO, Unit](10) // it will fix your error with implicits
IO[Unit]
, but in order to make it run you'd have to return it from run
method. You'd also need to change the type from unit to ExitCode
:stream.as(ExitCode.Success)
startPushingToQueue
you're creating Steam
but you're not assigning it anywhere. It will just create a description of the stream, but it won't be run.What I think you wanted to achieve is to create on the method which will push elements to queue and another which would get elements from the queue and print them. Please check my solution:
import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
class QueueInt(q: Queue[IO, Int])(implicit timer: Timer[IO]) { //I need implicit timer for metered
def startPushingToQueue: Stream[IO, Unit] = Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing element $n to Queue"))) //eval tap evaluates effect on an element but doesn't change stream
.metered(500.millis) //it will create 0.5 delay between enqueueing elements of stream,
// I added it to make visible that elements can be pushed and pulled from queue concurrently
.through(q.enqueue)
def pullAndPrintElements: Stream[IO, Unit] = q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
}
object testingQueues extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, Int](10)
b = new QueueInt(q)
_ <- b.startPushingToQueue.compile.drain.start //start at the end will start running stream in another Fiber
_ <- b.pullAndPrintElements.compile.drain //compile.draing compiles stream into io byt pulling all elements.
} yield ()
program.as(ExitCode.Success)
}
}
On console, you will see lines telling about pushing and pulling from queue interleaved.
If you remove start
you will see that firstly stream from startPushingToQueue
finishes after pushing all elements, and only then pullAndPrintElements
starts.
If you're looking for good sources to learn fs2, I would suggest that you should start with checking out fs2-related talks. Prefer newer talks, than the old one, because they could reference the older API.
You should also check guide on fs2 documentation.
Upvotes: 7