Reputation: 373
I have a spark job that runs via a Kubernetes pod . Till now I was using an Yaml file to run my jobs manually. Now , I want to schedule my spark jobs via airflow. This is the first time I am using airflow and I am unable to figure out how I can add my Yaml file in the airflow. From what I have read is that I can schedule my jobs via a DAG in Airflow. A dag example is this :
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {'owner':'test', 'start_date' : datetime(2019, 4, 3), 'retries': 2, 'retry_delay': timedelta(minutes=1) }
dag = DAG('test_dag', default_args = args, catchup=False)
def print_text1():
print("hell-world1")
def print_text():
print('Hello-World2')
t1 = PythonOperator(task_id='multitask1', python_callable=print_text1, dag=dag)
t2 = PythonOperator(task_id='multitask2', python_callable=print_text, dag=dag)
t1 >> t2
In this case the above methods will get executed on after the other once I play the DAG. Now , in case I want to run a spark submit job , what should I do? I am using Spark 2.4.4
Upvotes: 8
Views: 6162
Reputation: 164
As of 2023, we have new option to run spark job on kubernetes using "SparkKubernetesOperator" Refer : https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
In airflow we can use "SparkKubernetesOperator" and provide spark job details in ".yaml" file. YAML file will create driver and executor pod to run spark job
Airflow Task:
spark_operator = SparkKubernetesOperator(
dag=spark_operator_test,
task_id="spark_operator_task",
application_file='spark_app_config.yaml',
namespace="spark-apps",
on_success_callback=spark_success_alert,
on_failure_callback=spark_fail_alert
)
Sample YAML file :
# # Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0
(the "License");
# you may not use this file except in compliance with the License. # You may obtain a copy of the License at
# # https://www.apache.org/licenses/LICENSE-2.0
# # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi-1
namespace: spark-apps
spec:
type: Scala
mode: cluster
image: "apache/spark-py:v3.4.0"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.4.0.jar"
sparkVersion: "3.4.0"
sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "/mnt/data/eventLogs"
"spark.driver.log.persistToDfs.enabled": "true"
"spark.driver.log.dfsDir": "/mnt/data/eventLogs"
restartPolicy:
type: Never
volumes:
- name: "data-volume"
persistentVolumeClaim:
claimName: pvc-bifrost-spark-data
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.4.0
serviceAccount: spark-apps-spark
# podSecurityContext:
# fsGroup: 0
volumeMounts:
- name: "data-volume"
mountPath: "/mnt/data"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.4.0
deleteOnTermination: false
# podSecurityContext:
# fsGroup: 0
volumeMounts:
- name: "data-volume"
mountPath: "/mnt/data"
Upvotes: 2
Reputation: 991
Airflow has a concept of operators, which represent Airflow tasks. In your example PythonOperator is used, which simply executes Python code and most probably not the one you are interested in, unless you submit Spark job within Python code. There are several operators that you can take use of:
kubectl
or spark-submit
using it directlyspark-submit
Note: for each of the operators you need to ensure that your Airflow environment contains all the required dependencies for execution as well as the credentials configured to access the required services.
Also you can refer the existing thread:
Upvotes: 7