Gaolai Peng
Gaolai Peng

Reputation: 1

How to use map to make rest api calls in pyspark

I am trying to use map to make put rest api calls on a rdd like following:

def put(params, payload):
  
  url = "https://{}/{}".format(server, params)
  headers = {
    'Content-Type': 'application/json' }
  
  response = requests.request("PUT", url, headers=headers, data = payload) 
  return response.status_code

df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()

But I am getting an error:

org.apache.spark.api.python.PythonException: 'TypeError: <lambda>() missing 1 required positional argument: 'payload''

It seems like the lambda function didn't get the second parameter payload, not sure why. Can anyone help me out here?

Upvotes: 0

Views: 5182

Answers (3)

ankit p
ankit p

Reputation: 11

I am not able to add a comment to Nihad's answer. So adding an answer to Nihad's answer and the follow up question on how to merge the response back to the DF.

You could possibly do something like below

df_new = df.select("params", "payload").rdd.map(lambda row: [row[0],
                                                             row[1],
                                                             put(row[0], row[1])]
                                                )
                                       .toDF()

This way you can link the original data to the response.

Upvotes: 1

Nihad TP
Nihad TP

Reputation: 167

Inorder to add response to the dataframe you would have to register the put method with udf and use it in withColumn method to dataframe.

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

putUdf = udf(put, StringType())

df = df.withColumn("response", putUdf(df.params, df.payload))

This would create a new column called response and fills put ouput in it.

Upvotes: -1

Nihad TP
Nihad TP

Reputation: 167

This piece of code below is culprit:

df.select("params", "payload").rdd.map(lambda x, y: put(x, y)).collect()

You cannot unpack dataframe in lambda function. This is the correct one:

df.select("params", "payload").rdd.map(lambda row: put(row[0], row[1])).collect()

Upvotes: 1

Related Questions