Reputation: 494
I have an SBT task that uses taskDyn to dynamically create tasks for several CPU intensive computations from a configuration text file. The computation for the computation tasks by default runs in parallel which is great, but now I need to restrict the number of parallel runs (native library OpenBLAS crashes otherwise). I tried using Tag.CPU to enforce this limit (full example here):
import sbt._
import Keys._
import Def.Initialize
// ...
lazy val runAllValidations = taskKey[Seq[Unit]]("Runs all standard validations")
lazy val validations = settingKey[Seq[String]]("All standard validations")
def validationTaskFor(arguments: String): Initialize[Task[Unit]] =
(runMain in Compile).toTask(s" com.test.foo.validation.RunValidation $arguments") tag(Tags.CPU)
def validationTasksFor(arguments: Seq[String]): Initialize[Task[Seq[Unit]]] = Def.taskDyn {
arguments.map(validationTaskFor).joinWith(_.join)
}
validations := {
val fromFile = IO.read(file("validation_configs.txt"))
fromFile.split("\n").map(_.trim).toList
}
runAllValidations := Def.taskDyn { validationTasksFor(validations.value) }.value
concurrentRestrictions in Global := Seq(
Tags.limit(Tags.CPU, 2)
)
Unfortunately somehow I can't get the combination of taskDyn and Tags to work, all computations are started immediately:
Started with foo
Started with bla
Started with hoorray
Started with yeeha
Started with yeah
Finished with foo
Finished with hoorray
Finished with bla
Finished with yeeha
Finished with yeah
Is this combination known to not work or am I holding it wrong?
Thanks!
Upvotes: 2
Views: 598
Reputation: 5624
You're a victim of Any => Unit conversion in Scala. What you're doing is computing some super complicated thing in a task, and THEN ignoring it by converting it to (), and having scala ignore it.
Here's some code which uses the lower-level Initialize
API to fold over the tasks (something I'd like to expose more directly in the future)
my build.sbt:
def dummyTaskGen(name: String): Def.Initialize[Task[Unit]] = Def.task {
System.err.println(s"Started ${name}")
Thread.sleep(1000L*6)
System.err.println(s"Finished ${name}")
}
lazy val validations = taskKey[Unit]("Run everything, one at a time.")
lazy val names = Seq("foo", "bar", "baz", "biz", "buzz", "bunk")
lazy val allRuns: Def.Initialize[Task[Unit]] = Def.settingDyn {
val zero: Def.Initialize[Seq[Task[Unit]]] = Def.setting { Seq(task(())) }
names.map(dummyTaskGen).foldLeft(zero) { (acc, current) =>
acc.zipWith(current) { case (taskSeq, task) =>
taskSeq :+ task.tag(Tags.CPU)
}
} apply { tasks: Seq[Task[Unit]] =>
tasks.join map { seq => () /* Ignore the sequence of unit returned */ }
}
}
validations := allRuns.value
concurrentRestrictions in Global ++= Seq(
Tags.limit(Tags.CPU, 2)
)
And on the command line:
> validations
Started bar
Started buzz
Finished buzz
Finished bar
Started biz
Started bunk
Finished biz
Finished bunk
Started baz
Started foo
Finished baz
Finished foo
[success] Total time: 18 s, completed Jul 7, 2014 2:11:43 PM
let's dive into the methods we use here:
First, the key is that you'll be zipWith
-ing the entire initialization of tasks together.
For example, in your project, you have a mechanism to generate a tagged task called validationTaskFor
. This returns a Def.Intiailize[Task[Unit]]
, where the task is appropriately taged.
So, now you have a sequence of Strings, and you can turn this into a sequence of initializations. From here, we need to merge all the Initialization
outer-layer to get at the juicy goodness in the Task[_]
layer.
See: http://www.scala-sbt.org/0.13.5/api/index.html#sbt.Init$Initialize for the API of initialize.
NOTE: MOSTLY, initialize is hidden behind macros (like
Def.task
,Def.setting
,Def.inputTask
,:=
and friends). However, when working with dynamic task generation, the exposed APIs aren't quite sufficient, and you hit one of those areas where scala's implicit inference + our macros cause things to compile which are ridiculous in nature (i.e. discarding all the Task[_] instances generated and returning () for no good reason).
NOW, on to the meat of the function:
Def.settingDyn
requires an Initialize[T]
. In our case, T
is Task[Unit]
. So we know we need to get the expression underneath into an Initialize[Task[Unit]
, so let's type the allRuns
value appropriately to get better compiler error messages.
def allRuns: Def.Initialize[Task[Unit]] = Def.settingDyn { ... }
Def.Initialize
is an applicative, and we'll have a sequence of these. This means folding, so we should create a zero
for the fold. In this case, as we merge all the tasks together, we'll be returning an Initialize[Seq[Task[Unit]]
where all the tasks are inside a single Initialize
container, where we can interact with them together. (This is akin to ensuring that all the tasks are initialized/constructed correctly from State before we attempt to use them).
val zero: Def.Initialize[Seq[Task[Unit]]] = Def.setting { Seq(task( () )) }
Now, we convert our Seq[String]
into a Seq[Initialize[Task[Unit]]
names.map(dummyTaskGen)`
We fold over this sequence:
names.map(dummyTaskGen).foldLeft(zero) { (acc, current) => ... }
We define how to join an Initialize[Seq[Task[Unit]]
with an `Initialize[Task[Unit]], i.e. we add the new task into the existing list of tasks:
acc.zipWith(current) { case (taskSeq, task) =>
taskSeq :+ task.tag(Tags.CPU)
}
We now have an Initialize[Seq[Task[Unit]]
We join all these together:
tasks.join /* Task[Seq[Unit]] */
We ignore the Seq[Unit] result, converting it into a Unit
tasks.join.map { ignore => () }
You should do three things:
Def.task
or Def.taskDyn
macro to directly expose adding tags to tasks, thereby removing one of the two things requiring you to go down to this level of the API.Initialize[Seq[Initialize[Task[T]]]]
should be joinable without the hoopla, since it's the same dirty code everytime someone wants to do it.Upvotes: 6