Leonard
Leonard

Reputation: 561

How can I close file after stream ending?

I have a stream of sequence of java.io.File. And I use flatMapConcat for create new Source of file, something like that:

def func(files: List[File]) =
  Source(files)
    .map(f => openFile(f))
    .flatMapConcat(f => Source.fromPublisher(SomePublisher(f)))
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)

Is there a simple way to close each file after stream ending? SomePublisher() cannot close it.

Upvotes: 5

Views: 850

Answers (2)

Leonard
Leonard

Reputation: 561

So I found one good way of many ways to solve my problem, but if you have another way I would like to see it also.

def someSource(file: File) = {
  val f = openFile(file)

  Source
    .fromPublisher(SomePublisher(f))
    .transform(() => new PushStage[?, ?] {
      override def onPush(elem: ?, ctx: Context[?]): SyncDirective = ctx.push(elem)

      override def postStop(): Unit = {
        f.close()
        super.postStop()
      }
    }
}

def func(files: List[File]) =
  Source(files)
    .flatMapConcat(someSource)
    .grouped(10)
    .via(SomeFlow)
    .runWith(Sink.ignore)

Upvotes: 2

Yury Sukhoverkhov
Yury Sukhoverkhov

Reputation: 450

So, if I understand you correctly you do the following: for each file YOU create database object what opens the file. So since you are opening db connections in your code you are responsible for closing it. Since you are working with finite list of files you may store all db connections in a sequence, run your stream and close all connection after streaming ends.

Alternative way is to make your own publisher what will get file name, open db connection, stream from it, close db connection. Second option will allow you to stream from infinite list of files.

If you want code sniplets from me give me your full source for the function and I'll update it.

Upvotes: 1

Related Questions