Eric Bellet
Eric Bellet

Reputation: 2065

Execute a Databricks Notebook with PySpark code using Apache Airflow

I'm using Airflow, Databricks, and PySpark. I would like to know if it is possible to add more parameters when I want to execute a Databricks Notebook through Airflow.

I had the next code in Python named MyETL:

def main(**kwargs):
      spark.sql("CREATE TABLE {0} {1}".format(table, columns))
      print("Running my ETL!")

    if __name__== "__main__":
      main(arg1, arg2)

I want to define others task params that run a Databricks notebook with more params, I wanna add the name of the method, and the parameters of these methods. For example when I want to register tasks in a DAG in Airflow:

   notebook_task_params = {
        'new_cluster': new_cluster,
        'notebook_task': {
            'notebook_path': '/Users/[email protected]/MyETL',
            'method_name': 'main',
            'params':'[{'table':'A'},{'columns':['a', 'b']}]'
        },
    }

I don't know if that is possible because I didn't find similar examples.

# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    dag=dag,
    json=notebook_task_params)

In other words, I want to execute a notebook with parameters using Airflow. My question is how can I do that?

Upvotes: 2

Views: 2725

Answers (1)

Ryan
Ryan

Reputation: 299

You can add the method_name as params also, then parse out your logic on the notebook.

However, the more common pattern here is to make sure the method is already installed on your cluster.

params = '[{'table':'A'},{'columns':['a', 'b']}]'

Then in your notebook on databricks:

table = getArgument("table", "DefaultValue")
columns = getArgument("columns", "DefaultValue")

result = method(table, columns)

You'll also know if the params are accessible with getArgument() if you can see your params (image attached above) in your notebook job run.

enter image description here

Upvotes: 2

Related Questions