Reputation: 51
I am trying to create partitions within Dagster that will allow me to do backfills. The documentation has an example but it's to use the days of the week(which I was able to replicate). However, I am trying to create partitions with dates.
DATE_FORMAT = "%Y-%m-%d"
BACKFILL_DATE = "2021-04-01"
TODAY = datetime.today()
def get_number_of_days():
backfill_date_obj = datetime.strptime(BACKFILL_DATE, DATE_FORMAT)
delta = TODAY - backfill_date_obj
return delta
def get_date_partitions():
return [
Partition(
[
datetime.strftime(TODAY - timedelta(days=x), DATE_FORMAT)
for x in range(get_number_of_days().days)
]
)
]
def run_config_for_date_partition(partition):
date = partition.value
return {"solids": {"data_to_process": {"config": {"date": date}}}}
# ----------------------------------------------------------------------
date_partition_set = PartitionSetDefinition(
name="date_partition_set",
pipeline_name="my_pipeline",
partition_fn=get_date_partitions,
run_config_fn_for_partition=run_config_for_date_partition,
)
# EXAMPLE CODE FROM DAGSTER DOCS.
# def weekday_partition_selector(
# ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
# ) -> Union[Partition, List[Partition]]:
# """Maps a schedule execution time to the corresponding partition or list
# of partitions that
# should be executed at that time"""
# partitions = partition_set.get_partitions(ctx.scheduled_execution_time)
# weekday = ctx.scheduled_execution_time.weekday() if ctx.scheduled_execution_time else 0
# return partitions[weekday]
# My attempt. I do not want to partition by the weekday name, but just by the date.
# Instead of returnng the partition_set, I think I need to do something else with it
# but I'm not sure what it is.
def daily_partition_selector(
ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
) -> Union[Partition, List[Partition]]:
return partition_set.get_partitions(ctx.scheduled_execution_time)
my_schedule = date_partition_set.create_schedule_definition(
"my_schedule",
"15 8 * * *",
partition_selector=daily_partition_selector,
execution_timezone="UTC",
)
Current dagster UI has all the dates lumped together in the partition section. Actual Results
What am I missing that will give me the expected results?
Upvotes: 3
Views: 1850
Reputation: 51
After talking to the folks at Dagster they pointed me to this documentation https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#partition-based-schedules
This is so much simpler and I ended up with
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime(2021, 4, 1),
execution_time=time(hour=8, minute=15),
execution_timezone="UTC",
)
def my_schedule(date):
return {
"solids": {
"data_to_process": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
}
}
}
Upvotes: 2