Nightscape
Nightscape

Reputation: 494

Combining taskDyn and Tags in SBT

I have an SBT task that uses taskDyn to dynamically create tasks for several CPU intensive computations from a configuration text file. The computation for the computation tasks by default runs in parallel which is great, but now I need to restrict the number of parallel runs (native library OpenBLAS crashes otherwise). I tried using Tag.CPU to enforce this limit (full example here):

import sbt._
import Keys._
import Def.Initialize

// ...

lazy val runAllValidations = taskKey[Seq[Unit]]("Runs all standard validations")

lazy val validations = settingKey[Seq[String]]("All standard validations")

def validationTaskFor(arguments: String): Initialize[Task[Unit]] =
  (runMain in Compile).toTask(s" com.test.foo.validation.RunValidation $arguments") tag(Tags.CPU)

def validationTasksFor(arguments: Seq[String]): Initialize[Task[Seq[Unit]]] = Def.taskDyn {
  arguments.map(validationTaskFor).joinWith(_.join)
}

validations := {
  val fromFile = IO.read(file("validation_configs.txt"))
  fromFile.split("\n").map(_.trim).toList
}

runAllValidations := Def.taskDyn { validationTasksFor(validations.value) }.value

concurrentRestrictions in Global := Seq(
  Tags.limit(Tags.CPU, 2)
)

Unfortunately somehow I can't get the combination of taskDyn and Tags to work, all computations are started immediately:

Started with foo
Started with bla
Started with hoorray
Started with yeeha
Started with yeah
Finished with foo
Finished with hoorray
Finished with bla
Finished with yeeha
Finished with yeah

Is this combination known to not work or am I holding it wrong?

Thanks!

Upvotes: 2

Views: 598

Answers (1)

jsuereth
jsuereth

Reputation: 5624

You're a victim of Any => Unit conversion in Scala. What you're doing is computing some super complicated thing in a task, and THEN ignoring it by converting it to (), and having scala ignore it.

Here's some code which uses the lower-level Initialize API to fold over the tasks (something I'd like to expose more directly in the future)

my build.sbt:

def dummyTaskGen(name: String): Def.Initialize[Task[Unit]] = Def.task {
   System.err.println(s"Started  ${name}")
   Thread.sleep(1000L*6)
   System.err.println(s"Finished ${name}")
}


lazy val validations = taskKey[Unit]("Run everything, one at a time.")

lazy val names = Seq("foo", "bar", "baz", "biz", "buzz", "bunk")

lazy val allRuns: Def.Initialize[Task[Unit]] = Def.settingDyn {
  val zero: Def.Initialize[Seq[Task[Unit]]] = Def.setting {  Seq(task(())) }
  names.map(dummyTaskGen).foldLeft(zero) { (acc, current) =>
     acc.zipWith(current) {  case (taskSeq, task) =>
       taskSeq :+ task.tag(Tags.CPU)
     }
  } apply { tasks: Seq[Task[Unit]] =>
    tasks.join map { seq => () /* Ignore the sequence of unit returned */ }
  }
}

validations := allRuns.value

concurrentRestrictions in Global ++= Seq(
  Tags.limit(Tags.CPU, 2)
)

And on the command line:

> validations
Started  bar
Started  buzz
Finished buzz
Finished bar
Started  biz
Started  bunk
Finished biz
Finished bunk
Started  baz
Started  foo
Finished baz
Finished foo
[success] Total time: 18 s, completed Jul 7, 2014 2:11:43 PM

let's dive into the methods we use here:

First, the key is that you'll be zipWith-ing the entire initialization of tasks together.

For example, in your project, you have a mechanism to generate a tagged task called validationTaskFor. This returns a Def.Intiailize[Task[Unit]], where the task is appropriately taged.

So, now you have a sequence of Strings, and you can turn this into a sequence of initializations. From here, we need to merge all the Initialization outer-layer to get at the juicy goodness in the Task[_] layer.

See: http://www.scala-sbt.org/0.13.5/api/index.html#sbt.Init$Initialize for the API of initialize.

NOTE: MOSTLY, initialize is hidden behind macros (like Def.task, Def.setting, Def.inputTask, := and friends). However, when working with dynamic task generation, the exposed APIs aren't quite sufficient, and you hit one of those areas where scala's implicit inference + our macros cause things to compile which are ridiculous in nature (i.e. discarding all the Task[_] instances generated and returning () for no good reason).

NOW, on to the meat of the function:

  1. Def.settingDyn requires an Initialize[T]. In our case, T is Task[Unit]. So we know we need to get the expression underneath into an Initialize[Task[Unit], so let's type the allRuns value appropriately to get better compiler error messages.

    def allRuns: Def.Initialize[Task[Unit]] = Def.settingDyn { ... }

  2. Def.Initialize is an applicative, and we'll have a sequence of these. This means folding, so we should create a zero for the fold. In this case, as we merge all the tasks together, we'll be returning an Initialize[Seq[Task[Unit]] where all the tasks are inside a single Initialize container, where we can interact with them together. (This is akin to ensuring that all the tasks are initialized/constructed correctly from State before we attempt to use them).

    val zero: Def.Initialize[Seq[Task[Unit]]] = Def.setting { Seq(task( () )) }

  3. Now, we convert our Seq[String] into a Seq[Initialize[Task[Unit]] names.map(dummyTaskGen)`

  4. We fold over this sequence:

    names.map(dummyTaskGen).foldLeft(zero) { (acc, current) => ... }

  5. We define how to join an Initialize[Seq[Task[Unit]] with an `Initialize[Task[Unit]], i.e. we add the new task into the existing list of tasks:

    acc.zipWith(current) { case (taskSeq, task) => taskSeq :+ task.tag(Tags.CPU) }

  6. We now have an Initialize[Seq[Task[Unit]] We join all these together:

    tasks.join /* Task[Seq[Unit]] */

  7. We ignore the Seq[Unit] result, converting it into a Unit

    tasks.join.map { ignore => () }

You should do three things:

  1. Open a ticket about exposing a mechanism within the Def.task or Def.taskDyn macro to directly expose adding tags to tasks, thereby removing one of the two things requiring you to go down to this level of the API.
  2. Open a ticket about exposing, directly, a mechanism to join tasks. E.g. Initialize[Seq[Initialize[Task[T]]]] should be joinable without the hoopla, since it's the same dirty code everytime someone wants to do it.
  3. Open a ticket about the poor interaction of the automatic task macro and Scala's implicit Any => Unit causing very serious bugs.

Upvotes: 6

Related Questions