turtle_in_mind
turtle_in_mind

Reputation: 1152

building a prefect pipeline to run tasks forever

I am having trouble building a prefect pipeline. Suppose I have a file, call it streamA.py and streamB.py. The purpose of these two files, is to stream data continuously 24/7 and push data into a redis stream once every 500 records streamed.
I created another file called redis_to_postgres.py that grabs all the data in a the redis streams asynchronously and pushes the data to postgresql and cleans the memory from redis streams whose ids i just recently pushed. This is done via async. I want this timed every 15 minutes once the previous pipeline starts.

What would be the most practical way of doing this? Would I create 3 separate pipelines in this case? One for streamA and one for streamB and 3rd one to read from redis and push to postgresql and finally clean the data? Or would i create one pipeline to stream data in a parallel manner and another to just read and push to postgres? Thanks

Upvotes: 1

Views: 1199

Answers (1)

Anna Geller
Anna Geller

Reputation: 1758

An interesting use case! Are you asking this for Prefect ≤ 1.0 or for Orion? For Orion, there is a blog post that discusses the problem in more detail and shows example flow.

But I’ll assume you’re asking for Prefect ≤ 1.0.

In order to read the data from Redis and load it to Postgres, say every 10 seconds, you could use a loop within your Prefect task:

for iteration in range(1, 7):
  logger.info("iteration nr %s", iteration)
  read_from_redis_and_load_to_postgres() # your logic here
  if iteration < 6:
      logger.info("Sleeping for 10 seconds...")
      time.sleep(10)

And this flow could be scheduled to run every minute. This would give you retries, observability, and all the Prefect features, and loading data every 10 seconds to Postgres shouldn’t overwhelm your database.

But for the part that you get real-time data and continuously load it to a Redis stream, you may run it as a separate service rather than a Prefect flow since Prefect 1.0 flows are more designed towards batch processing and are expected to end at some point in order to tell whether the flow run was successful or not. If you would have it as a Prefect flow that never ends, it could lose flow heartbeat and get killed by a Zombie killer process. So it may be easier to run this part e.g. as a separate containerized service running 24/7. You could deploy it as a separate Kubernetes deployment or ECS service.

It also depends on many factors, incl. what this code is doing, how reliable this API is (does the source system from which you extract the data has some rate-limiting? why 500 records? what is the frequency those 500 records are filled and how frequently do you end up writing to Redis?).

Having said that, I would be curious to see if you could implement it in Orion, similarly to what the blog post example does. We are currently collecting feedback about streaming use cases for Orion, so we would be interested to hear your feedback on this if you implement this in Orion.

Upvotes: 2

Related Questions