Reputation: 800
I have the following Luigi setup
import luigi
class RootTask(luigi.WrapperTask):
def requires(self):
yield RunAndReport(
task=TaskA()
)
class RunAndReport(luigi.WrapperTask):
task = luigi.TaskParameter()
def requires(self):
yield self.task
yield Report(
input=self.task.input_file(),
output=self.task.output_file()
)
class Report(luigi.Task):
pass
class TaskA(luigi.Task):
def input_file(self):
return 'file://opt/something_in.txt'
def output_file(self):
return 'file://opt/something_out.txt'
and I am getting the following error
Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/luigi-3.0.3-py3.9.egg/luigi/worker.py", line 401, in check_complete
is_complete = task.complete()
File "/usr/local/lib/python3.9/dist-packages/luigi-3.0.3-py3.9.egg/luigi/task.py", line 822, in complete
return all(r.complete() for r in flatten(self.requires()))
File "/usr/local/lib/python3.9/dist-packages/luigi-3.0.3-py3.9.egg/luigi/task.py", line 883, in flatten
for result in iterator:
File "/opt/my_file.py", line 48, in requires
input_file=self.task.input_file(),
TypeError: input_file() missing 1 required positional argument: 'self'
As if TaskA is not instantiated and only the class definition is passed. Is this some kind of weird Python lazy initialization thing?
Upvotes: 0
Views: 342
Reputation: 800
What I actually ended up doing was:
import luigi
class RootTask(luigi.WrapperTask):
def requires(self):
task_a = TaskA()
yield task_a
yield Report(
input=task_a.input_file(),
output=task_a.output_file()
)
class Report(luigi.Task):
pass
class TaskA(luigi.Task):
def input_file(self):
return 'file://opt/something_in.txt'
def output_file(self):
return 'file://opt/something_out.txt'
There is some repetition but it works
Upvotes: 0
Reputation: 8290
TaskParameter() takes a class, not an instance.
I found this alternative which works with some instances, but it looks like it's not really set up to work that way. It's probably easier to set up something that can work with just passing the Task class and perhaps sending the other values as additional parameters.
https://github.com/spotify/luigi/issues/1945#issuecomment-291759850
import luigi
import json
class SubtaskParameter(luigi.Parameter):
def serialize(self, val):
return json.dumps((val.get_task_family(), val.param_args))
def parse(self, val):
(cls, params) = json.loads(val)
cls = luigi.task_register.Register.get_task_cls(cls)
return cls(*params)
Upvotes: 2