Reputation: 2065
I'm using Airflow, Databricks, and PySpark. I would like to know if it is possible to add more parameters when I want to execute a Databricks Notebook through Airflow.
I had the next code in Python named MyETL:
def main(**kwargs):
spark.sql("CREATE TABLE {0} {1}".format(table, columns))
print("Running my ETL!")
if __name__== "__main__":
main(arg1, arg2)
I want to define others task params that run a Databricks notebook with more params, I wanna add the name of the method, and the parameters of these methods. For example when I want to register tasks in a DAG in Airflow:
notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {
'notebook_path': '/Users/[email protected]/MyETL',
'method_name': 'main',
'params':'[{'table':'A'},{'columns':['a', 'b']}]'
},
}
I don't know if that is possible because I didn't find similar examples.
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
In other words, I want to execute a notebook with parameters using Airflow. My question is how can I do that?
Upvotes: 2
Views: 2725
Reputation: 299
You can add the method_name
as params
also, then parse out your logic on the notebook.
However, the more common pattern here is to make sure the method is already installed on your cluster.
params = '[{'table':'A'},{'columns':['a', 'b']}]'
Then in your notebook on databricks:
table = getArgument("table", "DefaultValue")
columns = getArgument("columns", "DefaultValue")
result = method(table, columns)
You'll also know if the params are accessible with getArgument()
if you can see your params (image attached above) in your notebook job run.
Upvotes: 2