John Bryan
John Bryan

Reputation: 3

How to store SQL query output columns as variables to be used as parameters for API data call in DATABRICKS

I have a SQL query which provides me with the below output :

Select FirstName, LastName, Title
From Default.Name
Tony         Gonzalez Mr
Tom          Brady    Mr
Patricia     Carroll  Miss

I would like to store FirstName, LastName & title column output rows as variable so i can use these variables as an input to my API data call which would take the names and return me with the data

My API Call looks something like this: ( Ignore the format & coding as this is a test code the actual code i have works fine)

from zeep import Client
from zeep.transports import Transport
from requests import Session
from requests.auth import HTTPBasicAuth
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
from pyspark.sql.functions import lit
from datetime import datetime

wsdl_url =  "Test.xml"

session = Session()
session.auth = HTTPBasicAuth('abc','password')

transport = Transport(session=session)

client = Client(wsdl=wsdl_url, transport=transport)

# List all available services and ports
print(client.wsdl.services)

# Ensure the service and port names are correct
service_name = 'Test'
port_name = 'TestPort'

# Verify the available services and ports
for service in client.wsdl.services.values():
    print(f"Service: {service.name}")
    for port in service.ports.values():
        print(f"  Port: {port.name}")

# Bind to the correct service and port
service = client.bind("Test", "BasicHttpBinding_TestService")

request_data = {
    'userAuth': {
        'Nickname':"Test"
    },
    'requestReference': 'test',
    'request': {
        'Subject': {
            'Forename': 'Patricia',
            'Surname': 'Carroll'
        },
        'Address': {
            'AddressLine1': '123 Test Street',
            'AddressLine2': '',
            'AddressLine3': 'LONDON',
            'Postcode': 'ABC 123'
        },
        'ConsentFlag': True
    }
}

if service:
    response = service.PerformIDCheckV2(**request_data)
    
    # Define the schema explicitly
    schema = StructType([
        StructField("FirstName", StringType(), True),
        StructField("SurName", StringType(), True),
        StructField("response", StringType(), True),
        StructField("ETLApplyDateTime", StringType(), True)
    ])

We can see above where i have hardcoded Patricia Name, i want those names to be passed as input from the above sql output

Upvotes: 0

Views: 43

Answers (1)

You can do this in PySpark & you can iterate over the rows of your SQL query result and pass the values dynamically to your API call.

Below is the code:

df = spark.sql("SELECT FirstName, LastName FROM Default.Name")
wsdl_url = "https://your-api-endpoint.com?wsdl"  # Ensure this is correct
session = Session()
session.auth = HTTPBasicAuth('abc', 'password')
transport = Transport(session=session)
client = Client(wsdl=wsdl_url, transport=transport)
service = client.bind("Test", "BasicHttpBinding_TestService")
schema = StructType([
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("Response", StringType(), True),
    StructField("ETLApplyDateTime", StringType(), True)
])
def process_partition(rows):
    """Function to process each partition in parallel"""
    import json    
    results = []
    for row in rows:
        request_data = {
            'userAuth': {'Nickname': "Test"},
            'requestReference': 'test',
            'request': {
                'Subject': {'Forename': row.FirstName, 'Surname': row.LastName},
                'Address': {'AddressLine1': '123 Test Street', 'AddressLine2': '', 'AddressLine3': 'LONDON', 'Postcode': 'ABC 123'},
                'ConsentFlag': True
            }
        }        
        try:
            response = service.PerformIDCheckV2(**request_data)
            response_str = json.dumps(response)  # Convert response to JSON string
        except Exception as e:
            response_str = f"Error: {str(e)}"  # Handle failures

        results.append((row.FirstName, row.LastName, response_str, datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
    return results
response_rdd = df.rdd.mapPartitions(process_partition)
response_df = spark.createDataFrame(response_rdd, schema)
display(response_df)

Results:

FirstName   LastName    Response    ETLApplyDateTime
Tony    Gonzalez    Error: 'NoneType' object has no attribute 'PerformIDCheckV2'    2025-02-21 09:55:09
Tom Brady   Error: 'NoneType' object has no attribute 'PerformIDCheckV2'    2025-02-21 09:55:09
Patricia    Carroll Error: 'NoneType' object has no attribute 'PerformIDCheckV2'    2025-02-21 09:55:09

In the avbove code Executing SQL query and Initializing API client Defining schema Converting DataFrame to RDD for parallel processing Processing API calls in parallel Converting RDD to DataFrame

Upvotes: 0

Related Questions