Jason V
Jason V

Reputation: 895

Using Luigi for a small POC to run process which do not create files, and it doesn't seem to work

I want to execute MyTask1 and then MyTask2. Only MyTask1 executes. Luigi reports that I have an Unfulfilled dependency. However, it prints...

====================
MyTask1: 5
====================

so I know that MyTask1 did execute.

This POC is in preparation for a project where I am going to be chaining a bunch of tasks that do work but don't generally create files (output).

Here is my code...

from enum import Enum

import luigi


class MyTask1(luigi.Task):

    x = luigi.IntParameter()
    y = luigi.IntParameter(default=0)

    task_complete = False

    def run(self):
        print(f"{'='*20}\nMyTask1: {self.x + self.y}\n{'='*20}")
        self.task_complete = True

    def complete(self):
        return self.task_complete


class MyTask2(luigi.Task):

    x = luigi.IntParameter()
    y = luigi.IntParameter(default=1)
    z = luigi.IntParameter(default=2)

    task_complete = False

    def requires(self):
        return MyTask1(x=self.x, y=self.y)

    def run(self):
        print(f"{'='*20}\nMyTask2: {self.x * self.y * self.z}\n{'='*20}")
        self.task_complete = True

    def complete(self):
        return self.task_complete


if __name__ == '__main__':
    luigi.build([MyTask2(x=3,y=2)], workers=3, local_scheduler=True)

Upvotes: 0

Views: 34

Answers (1)

MattMcKnight
MattMcKnight

Reputation: 8290

Your misunderstanding here is that the complete() call will be executed on the same MyTask1 class or instance where you are executing the run method. Anything you are doing to set instance or class variables of task_completed will be not be available. (to test this, add the line print("checking task 1 completion", MyTask1.task_complete) to your complete() method.

There needs to be something that changes in order for a fresh call to MyTask1 to know that it was completed. Even if you don't need a file, you can simply write out a sentinel file named with a unique id of that run of the pipeline and use the normal output() approach to mark the the task complete by the existence of that target.

The reason for this is that pipelines fail, and luigi needs to know which steps to repeat so that you don't have to re-run the whole pipeline every time.

Upvotes: 0

Related Questions