Reputation: 561
I have a stream of sequence of java.io.File. And I use flatMapConcat
for create new Source
of file, something like that:
def func(files: List[File]) =
Source(files)
.map(f => openFile(f))
.flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
Is there a simple way to close each file after stream ending? SomePublisher()
cannot close it.
Upvotes: 5
Views: 850
Reputation: 561
So I found one good way of many ways to solve my problem, but if you have another way I would like to see it also.
def someSource(file: File) = {
val f = openFile(file)
Source
.fromPublisher(SomePublisher(f))
.transform(() => new PushStage[?, ?] {
override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)
override def postStop(): Unit = {
f.close()
super.postStop()
}
}
}
def func(files: List[File]) =
Source(files)
.flatMapConcat(someSource)
.grouped(10)
.via(SomeFlow)
.runWith(Sink.ignore)
Upvotes: 2
Reputation: 450
So, if I understand you correctly you do the following: for each file YOU create database object what opens the file. So since you are opening db connections in your code you are responsible for closing it. Since you are working with finite list of files you may store all db connections in a sequence, run your stream and close all connection after streaming ends.
Alternative way is to make your own publisher what will get file name, open db connection, stream from it, close db connection. Second option will allow you to stream from infinite list of files.
If you want code sniplets from me give me your full source for the function and I'll update it.
Upvotes: 1