Reputation: 77
I am new to dagster and I am trying to understand how user inputs are handled by it. I am testing this out with the following piece of code:
from dagster import job, op
@op
def input_string():
ret = input('Enter string')
print(ret)
@job
def my_job():
input_string()
if __name__ == '__main__':
my_job.execute_in_process()
I then run the following in console:
dagit -f test.py
When I finally "Launch Run" however, I don't get an opportunity to enter input, and instead get an EOFError with the following info:
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "input_string": File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_plan.py", line 232, in dagster_event_sequence_for_step for step_event in check.generator(step_events): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 354, in core_dagster_event_sequence_for_step for user_event in check.generator( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 70, in _step_output_error_checked_user_event_sequence for user_event in user_event_sequence: File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 170, in execute_core_compute for step_output in yield_compute_results(step_context, inputs, compute_fn): File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute.py", line 138, in yield_compute_results for event in iterate_with_context( File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py", line 403, in iterate_with_context return File "C:\Users\username\Anaconda3\lib\contextlib.py", line 137, in exit self.gen.throw(typ, value, traceback) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py", line 73, in solid_execution_error_boundary raise error_cls( The above exception was caused by the following exception: EOFError: EOF when reading a line File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\utils.py", line 47, in solid_execution_error_boundary yield File "C:\Users\username\Anaconda3\lib\site-packages\dagster\utils_init.py", line 401, in iterate_with_context next_output = next(iterator) File "C:\Users\username\Anaconda3\lib\site-packages\dagster\core\execution\plan\compute_generator.py", line 65, in _coerce_solid_compute_fn_to_iterator result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs) File "test.py", line 14, in input_string ret = input('Enter string')
How can I get this to run?
Upvotes: 3
Views: 1969
Reputation: 925
ops are configured using a config schema. This allows you to provide configuration via the Dagit Launchpad
In your case you'd want to remove the input
call from your @op
code. You would then retrieve the input from the config object provided to your op using the context.op_config
dictionary, something like this:
@op(config_schema={'input1': str})
def input_string(context):
ret = context.op_config['input1']
print(ret)
@job
def my_job():
input_string()
if __name__ == '__main__':
my_job.execute_in_process()
edit: To get your input to print in the Dagster job console use the built-in Dagster logger like this:
@op(config_schema={'input1': str})
def input_string(context):
ret = context.op_config['input1']
context.log.info(ret)
Upvotes: 1