Fact
Fact

Reputation: 2460

Luigi - Unfulfilled %s at run time

I am trying to learn in a very simple way how luigi works. Just as a newbie I came up with this code

import luigi

class class1(luigi.Task):

  def requires(self):
     return class2()

  def output(self):
    return luigi.LocalTarget('class1.txt')

 def run(self):
    print 'IN class A'


class class2(luigi.Task): 

  def requires(self):
     return []

  def output(self):
     return luigi.LocalTarget('class2.txt')


if __name__ == '__main__':
  luigi.run()

Running this in command prompt gives error saying

raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ',', '.join(missing)))      

which is:

RuntimeError: Unfulfilled dependency at run time: class2__99914b932b  
   

Upvotes: 20

Views: 9396

Answers (4)

Raôny Traspadini
Raôny Traspadini

Reputation: 11

I had the same error, but I still haven't found it

class data_ingestion(luigi.Task):

def run(self):
    data = pd.read_csv('F:\Mega\MEGAsync\VS Code\winequality-red.csv', sep=';')
    data.to_csv(self.output().path, index=False)

def output(self):
    return luigi.LocalTarget('WineQuality.csv')

class data_prep(luigi.Task):

def requires(self):
    return data_ingestion()

def output(self):
    return [luigi.LocalTarget('Train.csv'), luigi.LocalTarget('Val.csv')]

def run(self):
    data = pd.read_csv('WineQuality.csv') # Lendo de um csv
    logger.info('\n Leitura rápida nos dados')
    data.head()

    column_target = 'quality' # Variável que se deseja prever
    columns_features = data.drop([column_target], axis=1)

    logger.info(f'=== Variável a ser predita: {column_target}')
    logger.info(f'=== Características disponíveis: {columns_features}')

    logger.info('Divisão do dataset em TREINO e TESTE (Validação)')
    data_train, data_val = train_test_split(data, test_size=0.2, stratify=data[column_target], random_state=1)
    
    logger.info(f"Salvando Train File")
    data_train.to_csv(self.output()[0].path, index=False)

    logger.info(f"Salvando Val File")
    data_val.to_csv(self.output()[1].path, index=False)
    

class training(luigi.Task):

def requires(self):
    return data_prep()

def output(self):
    return luigi.LocalTarget('joblibe_file') 

def run(self):

    data_train = pd.read_csv(self.input()[0].path)

    column_target = 'quality' # Variável que se deseja prever
    data_features = data_train.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_train = data_train[columns_features].values
    Y_train = data_train[column_target].values

    model = DecisionTreeRegressor() # Não implementei nenhum parâmetro pois preciso estudar certinho isso
    model.fit(X_train, Y_train)

    # Salvando o arquivo em um diretório de trabalho
    joblib_file = "joblib_model.pkl"
    joblib.dump(model, joblib_file)

    

class validation(luigi.Task):

def requires(self):
    return training()

def output(self):
    return luigi.LocalTarget('Metrics.csv')

def run(self):
    data_val = pd.read_csv(self.input()[1].path)
    
    column_target = 'quality' # Variável que se deseja prever
    data_features = data_val.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_val = data_val[columns_features].values
    Y_val = data_val[column_target].values
    
    # Importando o modelo salvo no treinamento
    joblib_model = joblib.load(self.input()[0].path) 

    y_val_predict = joblib_model.predict(X_val)
    score = joblib_model.score(X_val, Y_val)

    logger.info('=== Variáveis Preditas')
    logger.info(y_val_predict)
    logger.info('=== Acurácia')
    logger.info('{:.2f} %'.format(score))


    dict = {'Predições': [y_val_predict],
          'score': [score]} 

    df = pd.DataFrame(dict)

    logger.info(f"Salvando Em arquivo CSV para TESTE")
    df.to_csv(self.output()[0].path, index=False)
   
    # salvar várias métricas em um df e exportar

if name == 'main': luigi.run()

Upvotes: 1

amardip kumar
amardip kumar

Reputation: 27

This error comes because if you get the output that will never create. ex. if the output folder create by timestamp. timestamp change in every second so it will never be the same. so the error could be come.

Upvotes: 1

KansaiRobot
KansaiRobot

Reputation: 9942

I am also a beginner at luigi. Thanks for pointing out this kind of errors.

Following, the previous answer I managed to solve it adding to class2

def run(self):
     _out = self.output().open('w')
     _out.write(u"Hello World!\n")
     _out.close()
     print('in class B')

Upvotes: 0

mjalajel
mjalajel

Reputation: 2201

This happens because you define an output for class2 but never create it.

Let's break it down...

When running

python file.py class2 --local-scheduler

luigi will ask:

  • is the output of class2 already on disk? NO
  • check dependencies of class2: NONE
  • execute the run method (by default it's and empty method pass)
  • run method didn't return errors, so job finishes successfully.

However, when running

python file.py class1 --local-scheduler

luigi will:

  • is the output of class1 already on disk? NO
  • check task dependencies: YES: class2
  • pause to check status of class2
    • is the output of class2 on disk? NO
    • run class2 -> running -> done without errors
    • is the output of class2 on disk? NO -> raise error

luigi never runs a task unless all of its previous dependencies are met. (i.e. their output is on the file system)

Upvotes: 23

Related Questions