sgruskin
sgruskin

Reputation: 51

How to create partitions with a schedule in Dagster?

I am trying to create partitions within Dagster that will allow me to do backfills. The documentation has an example but it's to use the days of the week(which I was able to replicate). However, I am trying to create partitions with dates.

DATE_FORMAT = "%Y-%m-%d"
BACKFILL_DATE = "2021-04-01"
TODAY = datetime.today()


def get_number_of_days():
    backfill_date_obj = datetime.strptime(BACKFILL_DATE, DATE_FORMAT)
    delta = TODAY - backfill_date_obj

    return delta


def get_date_partitions():
    return [
        Partition(
            [
                datetime.strftime(TODAY - timedelta(days=x), DATE_FORMAT)
                for x in range(get_number_of_days().days)
            ]
        )
    ]


def run_config_for_date_partition(partition):
    date = partition.value
    return {"solids": {"data_to_process": {"config": {"date": date}}}}


# ----------------------------------------------------------------------
date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
)
# EXAMPLE CODE FROM DAGSTER DOCS.
# def weekday_partition_selector(
#     ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
# ) -> Union[Partition, List[Partition]]:
#     """Maps a schedule execution time to the corresponding partition or list
#     of partitions that
#     should be executed at that time"""
#     partitions = partition_set.get_partitions(ctx.scheduled_execution_time)
#     weekday = ctx.scheduled_execution_time.weekday() if ctx.scheduled_execution_time else 0
#     return partitions[weekday]

# My attempt. I do not want to partition by the weekday name, but just by the date. 
# Instead of returnng the partition_set, I think I need to do something else with it
# but I'm not sure what it is.
def daily_partition_selector(
    ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
) -> Union[Partition, List[Partition]]:
    return partition_set.get_partitions(ctx.scheduled_execution_time)

my_schedule = date_partition_set.create_schedule_definition(
    "my_schedule",
    "15 8 * * *",
    partition_selector=daily_partition_selector,
    execution_timezone="UTC",
)

Current dagster UI has all the dates lumped together in the partition section. Actual Results

Expected Results

What am I missing that will give me the expected results?

Upvotes: 3

Views: 1850

Answers (1)

sgruskin
sgruskin

Reputation: 51

After talking to the folks at Dagster they pointed me to this documentation https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#partition-based-schedules

This is so much simpler and I ended up with

@daily_schedule(
    pipeline_name="my_pipeline",
    start_date=datetime(2021, 4, 1),
    execution_time=time(hour=8, minute=15),
    execution_timezone="UTC",
)
def my_schedule(date):
    return {
        "solids": {
            "data_to_process": {
                "config": {
                    "date": date.strftime("%Y-%m-%d")
                }
            }
        }
    }

Upvotes: 2

Related Questions