Reputation: 2130
I am relatively new to creating flows with Luigi and am trying to understand why my small workflow is resulting in an unfulfilled dependency. I am trying to run the task StageProviders(), which has a single dependency ErrorsLogFile(). The tasks that must be run before StageProviders are simply tasks to create blank files on a shared drive. I receive following message when I try and run the StageProviders task in the following flow as follows:
Code:
#!/usr/local/bin/python
import luigi
import os
import shutil
import time
import pandas as pd
import time
class DupsExistingLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return None
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_DuplicatesExisting.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class DupsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsExistingLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Duplicates.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class ErrorsLogFile(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return DupsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errprs.xlsx"))
def run(self):
timestr = time.strftime("%Y-%m-%d")
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/Provider_Blank_DONOTDELETE.xlsx')
dest_file_str = os.path.join(os.path.join('/root/etc/mnt/Import/LogFiles/' + os.path.splitext(self.filename)[0] + '_' + timestr + "_Errors.xlsx"))
shutil.copyfile(src_blank_file_str, dest_file_str)
class StageProviders(luigi.Task):
filename = luigi.Parameter()
def requires(self):
return ErrorsLogFile(self.filename)
def output(self):
timestr = time.strftime("%Y-%m-%d")
return luigi.LocalTarget(os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt'))
def run(self):
timestr = time.strftime("%Y-%m-%d")
filepath_str = '/root/etc/mnt/Import/' + self.filename
xls_file = pd.ExcelFile(filepath_str)
df = xls_file.parse('Sheet1')
src_blank_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS.txt')
dest_file_str = os.path.join('/root/etc/mnt/Import/LogFiles/_SUCCESS_STG_' + os.path.splitext(self.filename)[0] + '_' + timestr + '.txt')
if not df.empty:
shutil.copyfile(src_blank_file_str, dest_file_str)
with self.output().open('w') as out_file:
for name in df['NP']:
print(name, end='\n', file=out_file)
Output:
root@ubuntu:~/pythonfiles/luigi_POC/cpi_luigi_poc/src# python3 -m luigi --module provider_import StageProviders --filename CCM_provider_sample.xlsx --
local-scheduler
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsExistingLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 4
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsExistingLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 3
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running DupsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done DupsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Checking if StageProviders(filename=CCM_provider_sample.xlsx) is complete
DEBUG: Checking if ErrorsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status PENDING
DEBUG: Checking if DupsLogFile(filename=CCM_provider_sample.xlsx) is complete
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status PENDING
INFO: Informed scheduler that task DupsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running ErrorsLogFile(filename=CCM_provider_sample.xlsx)
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) done ErrorsLogFile(filename=CCM_provider_sample.xlsx)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ErrorsLogFile_CCM_provider_sam_ad65b206fd has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) running StageProviders(filename=CCM_provider_sample.xlsx)
ERROR: [pid 10904] Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) failed StageProviders(filename=CCM_provider_sample.xlsx)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/luigi/worker.py", line 175, in run
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ', '.join(missing)))
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task StageProviders_CCM_provider_sam_ad65b206fd has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=306235977, workers=1, host=ubuntu, username=root, pid=10904) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 3 ran successfully:
- 1 DupsExistingLogFile(filename=CCM_provider_sample.xlsx)
- 1 DupsLogFile(filename=CCM_provider_sample.xlsx)
- 1 ErrorsLogFile(filename=CCM_provider_sample.xlsx)
* 1 failed:
- 1 StageProviders(filename=CCM_provider_sample.xlsx)
This progress looks :( because there were failed tasks
It appears this is because of this message:
RuntimeError: Unfulfilled dependency at run time: ErrorsLogFile_CCM_provider_sam_ad65b206fd
However, reading the output it seems ErrorsLogFile_CCM_provider_sam_ad65b206fd has finished before StageProviders is run?... Why is the scheduler returning unfulfilled dependency? I believe I'm misunderstanding how to "chain" tasks together. I simply want the StageProviders task to run following successful completion of the ErrorsLogFile, DupsLogFile, and DupsExistingLogFile tasks.
Upvotes: 4
Views: 2957
Reputation: 106
There is a typo in the output of your ErrorLogs (which is unfortunately not copied over to your run method)
[...] + timestr + "_Errprs.xlsx"))
Therefore, the task runs fine and gets status DONE but when StageProviders checks for its requirements, it calls the complete method of ErrorLogs which returns false because this file does not exist and hence this "Unfulfilled dependency at run time" error.
This error generally means that the status of the task changes during the execution of the workflow.
Upvotes: 1