Reputation: 895
I want to execute MyTask1 and then MyTask2. Only MyTask1 executes. Luigi reports that I have an Unfulfilled dependency
. However, it prints...
====================
MyTask1: 5
====================
so I know that MyTask1 did execute.
This POC is in preparation for a project where I am going to be chaining a bunch of tasks that do work but don't generally create files (output).
Here is my code...
from enum import Enum
import luigi
class MyTask1(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=0)
task_complete = False
def run(self):
print(f"{'='*20}\nMyTask1: {self.x + self.y}\n{'='*20}")
self.task_complete = True
def complete(self):
return self.task_complete
class MyTask2(luigi.Task):
x = luigi.IntParameter()
y = luigi.IntParameter(default=1)
z = luigi.IntParameter(default=2)
task_complete = False
def requires(self):
return MyTask1(x=self.x, y=self.y)
def run(self):
print(f"{'='*20}\nMyTask2: {self.x * self.y * self.z}\n{'='*20}")
self.task_complete = True
def complete(self):
return self.task_complete
if __name__ == '__main__':
luigi.build([MyTask2(x=3,y=2)], workers=3, local_scheduler=True)
Upvotes: 0
Views: 34
Reputation: 8290
Your misunderstanding here is that the complete()
call will be executed on the same MyTask1 class or instance where you are executing the run method. Anything you are doing to set instance or class variables of task_completed
will be not be available. (to test this, add the line print("checking task 1 completion", MyTask1.task_complete)
to your complete()
method.
There needs to be something that changes in order for a fresh call to MyTask1 to know that it was completed. Even if you don't need a file, you can simply write out a sentinel file named with a unique id of that run of the pipeline and use the normal output()
approach to mark the the task complete by the existence of that target.
The reason for this is that pipelines fail, and luigi needs to know which steps to repeat so that you don't have to re-run the whole pipeline every time.
Upvotes: 0