Reputation: 63
I have a Main pipeline in Synapse workspace which has 2 activities:
1st - Notebook activity
2nd - If Condition activity
For the 1st one (Synapse notebook, spark pool, pyspark), I have a SQL cell like the following:
It has a simple query using a join:
%%sql
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id
This will return some rows to me (< 50 rows)
Now I want to access this result set of less than 50 rows in the "If Condition" activity in Synapse Pipeline,
How do I perform this?
According to docs, I should be able to use the following:
@activity(‘Notebook1’).output.status.Output.result.exitValue
But the exitValue that I'm getting in the synapse notebook output is null. How do I access this result set in "If Condition" then?
Upvotes: 1
Views: 2658
Reputation: 6104
You have to return the value from notebook using mssparkutils.notebook.exit
to access it from your pipeline using @activity(‘Notebook1’).output.status.Output.result.exitValue
.
Instead of using an SQL cell, you can use spark.sql
and store the result in a dataframe using df = spark.sql(Query)
.
You can either choose to return the entire dataframe data to pipeline, or just the number of records using dataframe.count()
(if you want to verify the count of records).
mssparkutils.notebook.exit(str(df.count())) #where df is the dataframe
x = df.toPandas()
json = x.to_json(orient = 'records' )
mssparkutils.notebook.exit(json)
@activity('Notebook1').output.status.Output.result.exitValue
@json
method.Upvotes: 2
Reputation: 63
So 1 way I found is to explicitly attach an output to the exitValue of the Notebook after creating a Dataframe by doing the following:
Following are the 4 cells that I have in "Notebook1" :
%%sql
CREATE OR REPLACE A TEMP VIEW Result AS
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id
#Converting the result set to a dataframe
Query = "SELECT * FROM Result"
dfResult = spark.sql(Query)
#Converting the dataframe column that I want to use further to a List
resultVariable = dfResult.select(name).rdd.flatMap(lambda x:x).collect()
Refer this to convert a dataframe column to list
#Attaching the List to exitValue in Output
mssparkutils.notebook.exit(resultVariable)
Then, in the Expression of the "If Condition" activity we can use the following to access this result:
activity(Notebook1).output.status.Output.result.exitValue
So, in this way inside the Expression now you will be able to access the List that you have passed earlier in the exitValue
This is one approach that I found. Would be happy to know if there is an easier way to do this.
Upvotes: 1