user13933695
user13933695

Reputation: 41

Prefect Task Scheduling

I am new to Prefect, having worked mostly w/ Airflow. I have put together a workflow, that executes fine, but the tasks dont execute in the order I expect. Flow here:

with Flow(name='4chan_extract') as flow:
    board_param = Parameter(name='board_name', required = True, default='pol')
    getData(board= board_param)
    checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
    upload_raw(url="postgresql://postgres:user@localhost:5434/postgres", 
    board=board_param)
    remove_dupes(board=board_param)

However, when I use flow.visualise() this flow, the DAG looks really odd.

My understanding is that the context operator with sets order? using up_stream in each task didn't help.

Any help is appreciated.

Upvotes: 1

Views: 854

Answers (1)

Anna Geller
Anna Geller

Reputation: 1758

If you want your tasks to be called sequentially, one after the other, you can add upstream_tasks to each of your tasks. Additionally, to easily pass state dependencies, you can assign a name to a task when calling it (data = get_data(board=board_param)), this allows passing this named reference to downstream dependencies.

I can only guess how you want this flow to look like, but assuming you want it to run sequentially, here is a full example and a DAG visualization:

from prefect import task, Flow, Parameter


@task
def get_data(board):
    pass


@task
def check_db(url):
    pass


@task
def upload_raw(url, board):
    pass


@task
def remove_duplicates(board):
    pass


with Flow(name="4chan_extract") as flow:
    board_param = Parameter(name="board_name", required=True, default="pol")
    data = get_data(board=board_param)
    check = check_db(
        url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
    )
    upload = upload_raw(
        url="postgresql://postgres:user@localhost:5434/postgres",
        board=board_param,
        upstream_tasks=[check],
    )
    remove_duplicates(board=board_param, upstream_tasks=[upload])

if __name__ == "__main__":
    flow.visualize()

enter image description here

Upvotes: 2

Related Questions