Reputation: 4871
Trying out the newly minted Akka Streams. It seems to be working except for one small thing - there's no output.
I have the following table definition:
case class my_stream(id: Int, value: String)
class Streams(tag: Tag) extends Table[my_stream](tag, "my_stream") {
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (my_stream.tupled, my_stream.unapply)
}
And I'm trying to output the contents of the table to stdout like this:
def main(args: Array[String]) : Unit = {
implicit val system = ActorSystem("Subscriber")
implicit val materializer = ActorMaterializer()
val strm = TableQuery[Streams]
val db = Database.forConfig("pg-postgres")
try{
var src = Source.fromPublisher(db.stream(strm.result))
src.runForeach(r => println(s"${r.id},${r.value}"))(materializer)
} finally {
system.shutdown
db.close
}
}
I have verified that the query is being run by configuring debug logging. However, all I get is this:
08:59:24.099 [main] INFO com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
08:59:24.428 [main] INFO com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.
Upvotes: 4
Views: 1943
Reputation: 2412
Just in case it helps anyone searching this very same issue but in MySQL, take into account that you should enable the driver stream support "manually":
def enableStream(statement: java.sql.Statement): Unit = {
statement match {
case s: com.mysql.jdbc.StatementImpl => s.enableStreamingResults()
case _ =>
}
}
val publisher = sourceDb.stream(query.result.withStatementParameters(statementInit = enableStream))
Source: http://www.slideshare.net/kazukinegoro5/akka-streams-100-scalamatsuri
Upvotes: 1
Reputation: 4871
Ended up using @ViktorKlang answer and just wrapped the run with an Await.result
. I also found an alternative answer in the docs which demonstrates using the reactive streams publisher and subscriber interfaces:
The stream
method returns a DatabasePublisher[T]
and Source.fromPublisher
returns a Source[T, NotUsed]
. This means you have to attach a subscriber instead of using runForEach
- according to the release notes NotUsed
is a replacement for Unit
. Which means nothing gets passed to the Sink
.
Since Slick implements the reactive streams interface and not the Akka Stream interfaces you need to use the fromPublisher
and fromSubscriber
integration point. That means you need to implement the org.reactivestreams.Subscriber[T]
interface.
Here's a quick and dirty Subscriber[T]
implementation which simply calls println
:
class MyStreamWriter extends org.reactivestreams.Subscriber[my_stream] {
private var sub : Option[Subscription] = None;
override def onNext(t: my_stream): Unit = {
println(t.value)
if(sub.nonEmpty) sub.head.request(1)
}
override def onError(throwable: Throwable): Unit = {
println(throwable.getMessage)
}
override def onSubscribe(subscription: Subscription): Unit = {
sub = Some(subscription)
sub.head.request(1)
}
override def onComplete(): Unit = {
println("ALL DONE!")
}
}
You need to make sure you call the Subscription.request(Long)
method in onSubscribe
and then in onNext
to ask for data or nothing will be sent or you won't get the full set of results.
And here's how you use it:
def main(args: Array[String]) : Unit = {
implicit val system = ActorSystem("Subscriber")
implicit val materializer = ActorMaterializer()
val strm = TableQuery[Streams]
val db = Database.forConfig("pg-postgres")
try{
val src = Source.fromPublisher(db.stream(strm.result))
val flow = src.to(Sink.fromSubscriber(new MyStreamWriter()))
flow.run()
} finally {
system.shutdown
db.close
}
}
I'm still trying to figure this out so I welcome any feedback. Thanks!
Upvotes: 0
Reputation: 26579
The cause is that Akka Streams is asynchronous and runForeach
returns a Future which will be completed once the stream completes, but that Future is not being handled and as such the system.shutdown
and db.close
executes immediately instead of after the stream completes.
Upvotes: 3