Imad
Imad

Reputation: 2751

How to use dagster with great expectations?

The issue

I'm trying out great expectations with dagster, as per this guide

My pipeline seems to execute correctly until it reaches this block:

expectation = dagster_ge.ge_validation_op_factory(
        name='ge_validation_op',
        datasource_name='dev.data-pipeline-data-storage.data_pipelines.raw_data.sirene_update',
        suite_name='suite.data_pipelines.raw_data.sirene_update',
    )
    
if expectation["success"]:  
    print("Success") 

trying to call expectation["success"] results in a

# TypeError: 'SolidDefinition' object is not subscriptable

When I go inside the code of ge_validation_op_factory, there is a _ge_validation_fn that should yield ExpectationResult, but somehow it gets coverted into a SolidDefinition...

Dagster version = 0.15.9; Great Expectations version = 0.15.44

Code to reproduce the error

In my code, I am trying to interact with an s3 bucket, so it would be a bit tedious to re-create the code for my example but here it is anyway:

In a gx_postprocessing.py

import json
import boto3
import dagster_ge
from dagster import (
    op,
    graph,
    Field,
    String,
    OpExecutionContext,
)

from typing import List, Dict


@op(
    config_schema={
        "bucket": Field(
            String,
            description="s3 bucket name",
        ),
        "path_in_s3": Field(
            String,
            description="Prefix representing the path to data",
        ),
        "technical_date": Field(
            String,
            description="date string to fetch data",
        ),
        "file_name": Field(
            String,
            description="file name that contains the data",
        ),
    }
)
def read_in_json_datafile_from_s3(context: OpExecutionContext):
    bucket = context.op_config["bucket"]
    path_in_s3 = context.op_config["path_in_s3"]
    technical_date = context.op_config["technical_date"]
    file_name = context.op_config["file_name"]

    object = f"{path_in_s3}/" f"technical_date={technical_date}/" f"{file_name}"

    s3 = boto3.resource("s3")

    content_object = s3.Object(bucket, object)
    file_content = content_object.get()["Body"].read().decode("utf-8")
    json_content = json.loads(file_content)

    return json_content


@op
def process_example_dq(data: List[Dict]):
    return len(data)


@op
def postprocess_example_dq(numrows, expectation):
    if expectation["success"]:
        return numrows
    else:
        raise ValueError


@op
def validate_example_dq(context: OpExecutionContext):

    expectation = dagster_ge.ge_validation_op_factory(
        name='ge_validation_op',
        datasource_name='my_bucket.data_pipelines.raw_data.example_update',
        suite_name='suite.data_pipelines.raw_data.example_update',
    )

    return expectation


@graph(
    config={
        "read_in_json_datafile_from_s3": {
            "config": {
                "bucket": "my_bucket",
                "path_in_s3": "my_path",
                "technical_date": "2023-01-24",
                "file_name": "myfile_20230124.json",
            }
        },
    },
)
def example_update_evaluation():
    output_dict = read_in_json_datafile_from_s3()
    nb_items = process_example_dq(data=output_dict)
    expectation = validate_example_dq()
    postprocess_example_dq(
        numrows=nb_items,
        expectation=expectation,
    )

Do not forget to add great_expectations_poc_pipeline to your __init__.py where the pipelines=[..] are listed.

Upvotes: 0

Views: 542

Answers (1)

Owen Kephart
Owen Kephart

Reputation: 201

In this example, dagster_ge.ge_validation_op_factory(...) is returning an OpDefinition, which is the same type of thing as (for example) process_example_dq, and should be composed in the graph definition the same way, rather than invoked within another op.

So instead, you'd want to have something like:

validate_example_dq = dagster_ge.ge_validation_op_factory(
    name='ge_validation_op',
    datasource_name='my_bucket.data_pipelines.raw_data.example_update',
    suite_name='suite.data_pipelines.raw_data.example_update',
)

Then use that op inside your graph definition the same way you currently are (i.e. expectation = validate_example_dq())

Upvotes: 1

Related Questions