Gilad
Gilad

Reputation: 133

Using airflow to run spark streaming jobs?

We have in our hadoop cluster Spark Batch jobs and and Spark streaming jobs.

We would like to schedule and manage them both on the same platform.

We came across airflow, Which fits our need for a "platform to author, schedule, and monitor workflows".

I just want to be able to stop and start spark streaming job. Using airflow graphs and profiling is less of an issue.

My question is, Beside losing some functionality(graphs, profiling) , Why shouldn't I use Airflow to run spark streaming jobs?

I came across this question : Can airflow be used to run a never ending task?

which says it's possible and not why you shouldn't.

Upvotes: 12

Views: 12993

Answers (3)

Aleksejs R
Aleksejs R

Reputation: 517

Using Airflow branching functionality we can have one dag which will do both scheduling and monitoring of our streaming job. Dag will do a status check of the application and in case application is not running dag will submit a streaming job. In another case dag execution can be finished or you can add a sensor which will check streaming job status after some time with alerts and other stuff you need.

There are two main problems:

  1. Submit streaming application without waiting until it will be finished. Otherwise our operator will run until it will reach execution_timeout;

That problem can be solved by scheduling out streaming job under cluster mode with spark.yarn.submit.waitAppCompletion configuration parameter set tofalse

  1. Check the status of our streaming operator;

We can check streaming application status using Yarn. For example we can use command yarn application -list -appStates RUNNING . In case our application will be among the list of running applications we should no trigger our streaming job. The only thing is to make streaming job name unique.

Upvotes: 4

Ryan
Ryan

Reputation: 299

@mMorozonv's Looks good. You could have one DAG start the stream if it does not exist. Then a second DAG as a health checker to track it's progress. If the health check fails you could trigger the first DAG again.

Alternatively you can run the stream with a trigger interval of once[1].

# Load your Streaming DataFrame
sdf = spark.readStream.load(path="data/", format="json", schema=my_schema)
# Perform transformations and then write…
sdf.writeStream.trigger(once=True).start(path="/out/path", format="parquet")

This gives you all the same benefits of spark streaming, with the flexibility of batch processing.

You can simply point the stream at your data and this job will detect all the new files since the last iteration (using checkpointing), run a streaming batch, then terminate. You could trigger your airflow DAG's schedule to suit whatever lag you'd like to process data at (every minute, hour, etc.).

I wouldn't recommend this for low latency requirements, but its very suitable to be run every minute.

[1] https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

Upvotes: 7

iamorozov
iamorozov

Reputation: 811

There are no strict reasons why you shouldn't use Airflow to run Spark Streaming job. In fact you can monitor your process by periodically logging some metrics with

LOG.info(query.lastProgress)
LOG.info(query.status)

and see them in task log

Upvotes: 1

Related Questions