porter22
porter22

Reputation: 63

Pass parameters/arguments to HDInsight/Spark Activity in Azure Data Factory

I have an on-demand HDInsight cluster that is launched from a Spark Activity within Azure Data Factory and runs PySpark 3.1. To test out my code, I normally launch Jupyter Notebook from the created HDInsight Cluster page.

Now, I would like to pass some parameters to that Spark activity and retrieve these parameters from within Jupyter notebook code. I've tried doing so in two ways, but none of them worked for me:

Method A. as Arguments and then tried to retrieve them using sys.argv[].

Method B. as Spark configuration and then tried to retrieve them using sc.getConf().getAll().

I suspect that either:

Any pointers on how to pass parameters into HDInsight Spark activity within Azure Data Factory would be much appreciated.

enter image description here

Upvotes: 0

Views: 332

Answers (1)

Saideep Arikontham
Saideep Arikontham

Reputation: 6104

The issue is with the entryFilePath. In the Spark activity of HDInsight cluster, you must either give the entryFilePath as a .jar file or .py file. When we follow this, we can successfully pass arguments which can be utilized using sys.argv.

  • The following is an example of how you can pass arguments to python script.

enter image description here

  • The code inside nb1.py (sample) is as shown below:
from pyspark import SparkContext
from pyspark.sql import *
import sys

sc = SparkContext()
sqlContext = HiveContext(sc)

# Create an RDD from sample data which is already available
hvacText = sc.textFile("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

# Create a schema for our data
Entry = Row('Date', 'Time', 'TargetTemp', 'ActualTemp', 'BuildingID')
# Parse the data and create a schema
hvacParts = hvacText.map(lambda s: s.split(',')).filter(lambda s: s[0] != 'Date')
hvac = hvacParts.map(lambda p: Entry(str(p[0]), str(p[1]), int(p[2]), int(p[3]), int(p[6])))

# Infer the schema and create a table       
hvacTable = sqlContext.createDataFrame(hvac)
hvacTable.registerTempTable('hvactemptable')
dfw = DataFrameWriter(hvacTable)

#using agrument from pipeline to create table.
dfw.saveAsTable(sys.argv[1])
  • When the pipeline is triggered, it runs successfully and the required table will be created (name of this table is passed as an argument from pipeline Spark activity). We can query this table in HDInsight cluster's Jupyter notebook using the following query:
select * from new_hvac

enter image description here

NOTE:

So, please ensure that you are passing arguments to python script (.py file) but not a python notebook.

Upvotes: 0

Related Questions