Migwell
Migwell

Reputation: 20127

Producing files in dagster without caring about the filename

In the dagster tutorial, in the Materializiations section, we choose a filename (sorted_cereals_csv_path) for our intermediate output, and then yield it as a materialization:

@solid
def sort_by_calories(context, cereals):

    # Sort the data (removed for brevity)

    sorted_cereals_csv_path = os.path.abspath(
        'calories_sorted_{run_id}.csv'.format(run_id=context.run_id)
    )
    with open(sorted_cereals_csv_path, 'w') as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)
    yield Materialization(
        label='sorted_cereals_csv',
        description='Cereals data frame sorted by caloric content',
        metadata_entries=[
            EventMetadataEntry.path(
                sorted_cereals_csv_path, 'sorted_cereals_csv_path'
            )
        ],
    )
    yield Output(None)

However, this is relying on the fact that we can use the local filesystem (which may not be true), it will likely get overwritten by later runs (which is not what I want) and it's also forcing us to come up with a filename which will never be used.

What I'd like to do in most of my solids is just say "here is a file object, please store it for me", without concerning myself with where it's going to be stored. Can I materialize a file without considering all these things? Should I use python's tempfile facility for this?

Upvotes: 0

Views: 1206

Answers (1)

Migwell
Migwell

Reputation: 20127

Actually it seems this is answered in the output_materialization example.

You basically define a type:

@usable_as_dagster_type(
    name='LessSimpleDataFrame',
    description='A more sophisticated data frame that type checks its structure.',
    input_hydration_config=less_simple_data_frame_input_hydration_config,
    output_materialization_config=less_simple_data_frame_output_materialization_config,
)
class LessSimpleDataFrame(list):
    pass

This type has an output_materialization strategy that reads the config:

def less_simple_data_frame_output_materialization_config(
    context, config, value
):
    csv_path = os.path.abspath(config['csv']['path'])

    # Save data to this path

And you specify this path in the config:

    execute_pipeline(
        output_materialization_pipeline,
        {
            'solids': {
                'sort_by_calories': {
                    'outputs': [
                        {'result': {'csv': {'path': 'cereal_out.csv'}}}
                    ],
                }
            }
        },
    )

You still have to come up with a filename for each intermediate output, but you can do it in the config, which can differ per-run, instead of defining it in the pipeline itself.

Upvotes: 2

Related Questions