Slick Vick
Slick Vick

Reputation: 55

Future returned by Slick run method completes successfully before rows have been inserted

I am working a Scala app using Slick and PostgreSQL for storage. I have a method foo which reads data from a CSV file and inserts about 10,000 rows into the database. After the Future of the row insertion action completes, another method bar is called which retrieves these rows from the database and performs some actions on them. This is where the problem lies: no rows are actually retrieved from the database, as no rows have been inserted by the time the Future completes.

From what I could gather while looking for an answer and at the official documentation, the Future is not supposed to complete before the insert statement is successfully executed. If I add the following code Thread.sleep(30000) to bar, allowing the insert statement to be executed first, the methods provide the expected result. Now, for obvious reasons I would rather not do that, so I am looking for alternatives.

The following diagram illustrates the program flow once initial method doStuff is called:

The program flow diagram

doStuff calls foo which loads the data and stores it in the database, before returning a Future. In doStuff this Future is then mapped and a call to bar is made. bar bar retrieves the rows from the database and processes them. However, since no rows have been inserted at the point where bar is called, no data is processed.

The doStuff method:

def doStuff(csvFile: File): Future[Unit] = {
  fooService.foo(csvFile)
    .map(_ => {
      csvFile.delete()
      barService.bar()
    })
}

The foo method:

def foo(file: File) Future[Unit] = {
  val reader = CSVReader.open(file)
  fooStorage.truncateFooData().map(_ => {
    val foos = for (line <- reader.iterator if
    line.head != "bad1" &&
      line.head !="bad2")
      yield parseFooData(line)
    fooStorage.saveFooDataBulk(foos.toSeq)
  })
}

How I am using Slick to insert the rows:

override def saveFooDataBulk(fooSeq: Seq[Foo]): Future[Seq[Foo]] =
  db.run(DBIO.seq(fooQuery ++= fooSeq)).map(_ => fooSeq)

I am expecting bar to be called once all the rows have been inserted into the database and not sooner, however, currently the Future from Slick completes too soon. In case it's relevant: the doStuff method is called when a request is sent to an Akka Http endpoint. The application and the database are running in two different docker containers. What am I doing wrong?

Also I can't get over how appropriate the username I picked a year and a half ago is now.

Upvotes: 2

Views: 265

Answers (1)

Andrey Tyukin
Andrey Tyukin

Reputation: 44918

Replace map by flatMap in the def foo. Otherwise, it will start a Future[Seq[Foo]], then immediately return a (), which is then discarded by your doStuff. Something like this:

fooStorage
  .truncateFooData()
  .flatMap(_ => {
    /* stuff... */
    fooStorage.saveFooDataBulk(foo.toSeq)
  })
  .map(_ => ())

I didn't test it, but in any case, starting some Futures in the middle of another Future.map and then immediately returning an () doesn't feel quite right.

Upvotes: 2

Related Questions