Reputation: 55
I am working a Scala app using Slick and PostgreSQL for storage. I have a method foo
which reads data from a CSV file and inserts about 10,000 rows into the database. After the Future of the row insertion action completes, another method bar
is called which retrieves these rows from the database and performs some actions on them. This is where the problem lies: no rows are actually retrieved from the database, as no rows have been inserted by the time the Future completes.
From what I could gather while looking for an answer and at the official documentation, the Future is not supposed to complete before the insert statement is successfully executed. If I add the following code Thread.sleep(30000)
to bar
, allowing the insert statement to be executed first, the methods provide the expected result. Now, for obvious reasons I would rather not do that, so I am looking for alternatives.
The following diagram illustrates the program flow once initial method doStuff
is called:
doStuff
calls foo
which loads the data and stores it in the database, before returning a Future. In doStuff
this Future is then mapped and a call to bar
is made. bar
bar retrieves the rows from the database and processes them. However, since no rows have been inserted at the point where bar
is called, no data is processed.
The doStuff
method:
def doStuff(csvFile: File): Future[Unit] = {
fooService.foo(csvFile)
.map(_ => {
csvFile.delete()
barService.bar()
})
}
The foo
method:
def foo(file: File) Future[Unit] = {
val reader = CSVReader.open(file)
fooStorage.truncateFooData().map(_ => {
val foos = for (line <- reader.iterator if
line.head != "bad1" &&
line.head !="bad2")
yield parseFooData(line)
fooStorage.saveFooDataBulk(foos.toSeq)
})
}
How I am using Slick to insert the rows:
override def saveFooDataBulk(fooSeq: Seq[Foo]): Future[Seq[Foo]] =
db.run(DBIO.seq(fooQuery ++= fooSeq)).map(_ => fooSeq)
I am expecting bar
to be called once all the rows have been inserted into the database and not sooner, however, currently the Future from Slick completes too soon. In case it's relevant: the doStuff
method is called when a request is sent to an Akka Http endpoint. The application and the database are running in two different docker containers. What am I doing wrong?
Also I can't get over how appropriate the username I picked a year and a half ago is now.
Upvotes: 2
Views: 265
Reputation: 44918
Replace map
by flatMap
in the def foo
. Otherwise, it will start a Future[Seq[Foo]]
, then immediately return a ()
, which is then discarded by your doStuff
. Something like this:
fooStorage
.truncateFooData()
.flatMap(_ => {
/* stuff... */
fooStorage.saveFooDataBulk(foo.toSeq)
})
.map(_ => ())
I didn't test it, but in any case, starting some Future
s in the middle of another Future.map
and then immediately returning an ()
doesn't feel quite right.
Upvotes: 2