Reputation: 3
I have a SQL query which provides me with the below output :
Select FirstName, LastName, Title
From Default.Name
Tony Gonzalez Mr
Tom Brady Mr
Patricia Carroll Miss
I would like to store FirstName, LastName & title column output rows as variable so i can use these variables as an input to my API data call which would take the names and return me with the data
My API Call looks something like this: ( Ignore the format & coding as this is a test code the actual code i have works fine)
from zeep import Client
from zeep.transports import Transport
from requests import Session
from requests.auth import HTTPBasicAuth
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
from pyspark.sql.functions import lit
from datetime import datetime
wsdl_url = "Test.xml"
session = Session()
session.auth = HTTPBasicAuth('abc','password')
transport = Transport(session=session)
client = Client(wsdl=wsdl_url, transport=transport)
# List all available services and ports
print(client.wsdl.services)
# Ensure the service and port names are correct
service_name = 'Test'
port_name = 'TestPort'
# Verify the available services and ports
for service in client.wsdl.services.values():
print(f"Service: {service.name}")
for port in service.ports.values():
print(f" Port: {port.name}")
# Bind to the correct service and port
service = client.bind("Test", "BasicHttpBinding_TestService")
request_data = {
'userAuth': {
'Nickname':"Test"
},
'requestReference': 'test',
'request': {
'Subject': {
'Forename': 'Patricia',
'Surname': 'Carroll'
},
'Address': {
'AddressLine1': '123 Test Street',
'AddressLine2': '',
'AddressLine3': 'LONDON',
'Postcode': 'ABC 123'
},
'ConsentFlag': True
}
}
if service:
response = service.PerformIDCheckV2(**request_data)
# Define the schema explicitly
schema = StructType([
StructField("FirstName", StringType(), True),
StructField("SurName", StringType(), True),
StructField("response", StringType(), True),
StructField("ETLApplyDateTime", StringType(), True)
])
We can see above where i have hardcoded Patricia Name, i want those names to be passed as input from the above sql output
Upvotes: 0
Views: 43
Reputation: 3250
You can do this in PySpark & you can iterate over the rows of your SQL query result and pass the values dynamically to your API call.
Below is the code:
df = spark.sql("SELECT FirstName, LastName FROM Default.Name")
wsdl_url = "https://your-api-endpoint.com?wsdl" # Ensure this is correct
session = Session()
session.auth = HTTPBasicAuth('abc', 'password')
transport = Transport(session=session)
client = Client(wsdl=wsdl_url, transport=transport)
service = client.bind("Test", "BasicHttpBinding_TestService")
schema = StructType([
StructField("FirstName", StringType(), True),
StructField("LastName", StringType(), True),
StructField("Response", StringType(), True),
StructField("ETLApplyDateTime", StringType(), True)
])
def process_partition(rows):
"""Function to process each partition in parallel"""
import json
results = []
for row in rows:
request_data = {
'userAuth': {'Nickname': "Test"},
'requestReference': 'test',
'request': {
'Subject': {'Forename': row.FirstName, 'Surname': row.LastName},
'Address': {'AddressLine1': '123 Test Street', 'AddressLine2': '', 'AddressLine3': 'LONDON', 'Postcode': 'ABC 123'},
'ConsentFlag': True
}
}
try:
response = service.PerformIDCheckV2(**request_data)
response_str = json.dumps(response) # Convert response to JSON string
except Exception as e:
response_str = f"Error: {str(e)}" # Handle failures
results.append((row.FirstName, row.LastName, response_str, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
return results
response_rdd = df.rdd.mapPartitions(process_partition)
response_df = spark.createDataFrame(response_rdd, schema)
display(response_df)
Results:
FirstName LastName Response ETLApplyDateTime
Tony Gonzalez Error: 'NoneType' object has no attribute 'PerformIDCheckV2' 2025-02-21 09:55:09
Tom Brady Error: 'NoneType' object has no attribute 'PerformIDCheckV2' 2025-02-21 09:55:09
Patricia Carroll Error: 'NoneType' object has no attribute 'PerformIDCheckV2' 2025-02-21 09:55:09
In the avbove code Executing SQL query and Initializing API client Defining schema Converting DataFrame to RDD for parallel processing Processing API calls in parallel Converting RDD to DataFrame
Upvotes: 0