Reputation: 23
I have a specific ETL problem: in my case data comes from a paginated resource, and can be fetched by pages of fixed size. So the call to the API specifies page 1, page 2, page 3 etc.
According to my plan each page is then transformed and loaded.
So an ETL of a page is an etl_batch
flow (of atomic (page)tasks), but then I would like to run ETL over the whole paginated resource, until the last available page. So I thought this should be solved by a flow with a loop (called every hour, for example) over etl_batch
flows (called every couple of seconds). Based on references below I realized the model has to be cast as a flow of a task of (page)flows of (page)tasks.
docs.prefect.io/core/advanced_tutorials/task-looping.html
stackoverflow.com/questions/68103561/looping-tasks-in-prefect
Here's the issue I encountered : if I specify the schedule for etl_batch
flow its state appears to be reset.
But I comment out etl_batch
flow schedule schedule_every_second
, then the loop is working correctly, i.e page
is incremented (please see the snippet below). However, for this ETL pipeline it is important to run etl_batch
flow and (page)tasks on schedule due to possible saturation of the API.
I assume the problem is due to the fact that cache is being cleared when etl_batch
flow is run on schedule (I tried to insert cache_for=
here and there to no avail). Do you have any suggestions?
from datetime import timedelta
from prefect import task, Flow, context, Parameter
from prefect.schedules import IntervalSchedule
from prefect.engine.signals import LOOP
schedule = IntervalSchedule(
start_date=None,
interval=timedelta(seconds=3),
)
schedule_every_second = IntervalSchedule(
start_date=None,
interval=timedelta(seconds=1),
)
@task
def next_page(page):
print(f" {page}")
return page + 1
@task
def paginate():
current_page = context.get("task_loop_result", 1)
with Flow('etl_batch',
# schedule=schedule_every_second
) as flow:
x = Parameter('x')
add = next_page(x)
state = flow.run(parameters={'x': current_page})
nr = state.result[add]._result.value
if nr > 3:
return nr
raise LOOP(result=nr)
if __name__ == "__main__":
with Flow("etl", schedule=schedule) as etl_flow:
r = paginate()
f = etl_flow.run()
Upvotes: 1
Views: 833
Reputation: 71
You are trying to use both task looping and a scheduled flow together to fetch from your external resource multiple times. You should only need one. You should use task looping to iterate a task n
times. Reasons below.
Calling flow.run()
on a scheduled flow will run infinitely, your program never moves forward from the line flow.run(parameters={'x': current_page})
is called. The surrounding task isn't looping at all. You are just calling the same flow with the same x=current_page
value over and over again, on a schedule.
You should remove the schedule from your sub flow, and simply use time.sleep(1)
to enforce a waiting period. Then you can use loop results to collect your paginated resources, as described
Upvotes: 1