Reputation: 12817
I have 2 tasks, each is a subclass of inluigi.contrib.external_program.ExternalProgramTask
.
Each task is calling a bash script on different dir, and I wanted to add dependency between them:
class ATask(ExternalProgramTask):
def program_args(self):
return my_script_a.split()
class BTask(ExternalProgramTask):
def requires(self):
return ATask()
def program_args(self):
return my_script_b.split()
if __name__ == '__main__':
luigi.build([BTask()], workers=2, local_scheduler=False)
The error is this:
File .../local/lib/python2.7/site-packages/luigi/worker.py", line 182, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ATask__...
From what I understand - each task is finishing ok by itself (I can see ATask
finished ok in the visualizer, and BTask is outputing what is expected from a successful run), but the dependecy is somehow messed up.
From what I saw, I need to define an output
method, and make sure the target will exist, something like adding:
def output(self):
return luigi.LocalTarget('foo')
to ATask
, BUT, since I can't override run
in these tasks (because they are ExternalProgramTask
subclasses), I can't create that file (target). How can I make BTask understand Atask
had finished ok?
Upvotes: 0
Views: 241
Reputation: 1160
Even though this is old, in case someone else has same question:
class ATask(ExternalProgramTask):
def program_args(self):
return my_script_a.split()
def run(self):
super().run()
with self.output().open("w") as f:
f.write("external completed")
def output(self):
return luigi.LocalTarget("external_completed.txt")
Upvotes: 0
Reputation: 61
in ATask, add a complete() function which checks if ATask has finished ok (by returning True) or hasn't finished ok (by returning False) and several checks
Upvotes: 0