Mark J Miller
Mark J Miller

Reputation: 4871

Reading from postgres using Akka Streams 2.4.2 and Slick 3.0

Trying out the newly minted Akka Streams. It seems to be working except for one small thing - there's no output.

I have the following table definition:

case class my_stream(id: Int, value: String)

class Streams(tag: Tag) extends Table[my_stream](tag, "my_stream") {
  def id = column[Int]("id")
  def value = column[String]("value")
  def * = (id, value) <> (my_stream.tupled, my_stream.unapply)
}

And I'm trying to output the contents of the table to stdout like this:

def main(args: Array[String]) : Unit = {
  implicit val system = ActorSystem("Subscriber")
  implicit val materializer = ActorMaterializer()

  val strm = TableQuery[Streams]
  val db = Database.forConfig("pg-postgres")

  try{
    var src = Source.fromPublisher(db.stream(strm.result))
    src.runForeach(r => println(s"${r.id},${r.value}"))(materializer)
  } finally {
    system.shutdown
    db.close
  }
}

I have verified that the query is being run by configuring debug logging. However, all I get is this:

08:59:24.099 [main] INFO  com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
08:59:24.428 [main] INFO  com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.

Upvotes: 4

Views: 1943

Answers (3)

JavierCane
JavierCane

Reputation: 2412

Just in case it helps anyone searching this very same issue but in MySQL, take into account that you should enable the driver stream support "manually":

def enableStream(statement: java.sql.Statement): Unit = {
  statement match {
    case s: com.mysql.jdbc.StatementImpl => s.enableStreamingResults()
    case _ =>
  }
}

val publisher = sourceDb.stream(query.result.withStatementParameters(statementInit = enableStream))

Source: http://www.slideshare.net/kazukinegoro5/akka-streams-100-scalamatsuri

Upvotes: 1

Mark J Miller
Mark J Miller

Reputation: 4871

Ended up using @ViktorKlang answer and just wrapped the run with an Await.result. I also found an alternative answer in the docs which demonstrates using the reactive streams publisher and subscriber interfaces:

The stream method returns a DatabasePublisher[T] and Source.fromPublisher returns a Source[T, NotUsed]. This means you have to attach a subscriber instead of using runForEach - according to the release notes NotUsed is a replacement for Unit. Which means nothing gets passed to the Sink.

Since Slick implements the reactive streams interface and not the Akka Stream interfaces you need to use the fromPublisher and fromSubscriber integration point. That means you need to implement the org.reactivestreams.Subscriber[T] interface.

Here's a quick and dirty Subscriber[T] implementation which simply calls println:

class MyStreamWriter extends org.reactivestreams.Subscriber[my_stream] {
  private var sub : Option[Subscription] = None;

  override def onNext(t: my_stream): Unit = {
    println(t.value)
    if(sub.nonEmpty) sub.head.request(1)
  }

  override def onError(throwable: Throwable): Unit = {
    println(throwable.getMessage)
  }

  override def onSubscribe(subscription: Subscription): Unit = {
    sub = Some(subscription)
    sub.head.request(1)
  }

  override def onComplete(): Unit = {
    println("ALL DONE!")
  }
}

You need to make sure you call the Subscription.request(Long) method in onSubscribe and then in onNext to ask for data or nothing will be sent or you won't get the full set of results.

And here's how you use it:

def main(args: Array[String]) : Unit = {
  implicit val system = ActorSystem("Subscriber")
  implicit val materializer = ActorMaterializer()

  val strm = TableQuery[Streams]
  val db = Database.forConfig("pg-postgres")

  try{
    val src = Source.fromPublisher(db.stream(strm.result))
    val flow = src.to(Sink.fromSubscriber(new MyStreamWriter()))
    flow.run()
  } finally {
    system.shutdown
    db.close
  }
}

I'm still trying to figure this out so I welcome any feedback. Thanks!

Upvotes: 0

Viktor Klang
Viktor Klang

Reputation: 26579

The cause is that Akka Streams is asynchronous and runForeach returns a Future which will be completed once the stream completes, but that Future is not being handled and as such the system.shutdown and db.close executes immediately instead of after the stream completes.

Upvotes: 3

Related Questions