Reputation: 1
I am trying to use map to make put rest api calls on a rdd
like following:
def put(params, payload):
url = "https://{}/{}".format(server, params)
headers = {
'Content-Type': 'application/json' }
response = requests.request("PUT", url, headers=headers, data = payload)
return response.status_code
df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()
But I am getting an error:
org.apache.spark.api.python.PythonException: 'TypeError: <lambda>() missing 1 required positional argument: 'payload''
It seems like the lambda function didn't get the second parameter payload
, not sure why. Can anyone help me out here?
Upvotes: 0
Views: 5182
Reputation: 11
I am not able to add a comment to Nihad's answer. So adding an answer to Nihad's answer and the follow up question on how to merge the response back to the DF.
You could possibly do something like below
df_new = df.select("params", "payload").rdd.map(lambda row: [row[0],
row[1],
put(row[0], row[1])]
)
.toDF()
This way you can link the original data to the response.
Upvotes: 1
Reputation: 167
Inorder to add response to the dataframe you would have to register the put method with udf and use it in withColumn method to dataframe.
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
putUdf = udf(put, StringType())
df = df.withColumn("response", putUdf(df.params, df.payload))
This would create a new column called response and fills put ouput in it.
Upvotes: -1
Reputation: 167
This piece of code below is culprit:
df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()
You cannot unpack dataframe in lambda function. This is the correct one:
df.select("params", "payload").rdd.map(lambda row: put(row[0], row[1])).collect()
Upvotes: 1