Reputation: 47
So I currently have a Code Repository set up, and within this I have imported a REST API as a source (called “Gdelt2Retrieve”) . Within this REST API, I also have a webhook called ("Gdelt2) configured. I can run this in the data connection tab and successfully see the correct results from the webhook/API. I also have the egress policies set up.
I am trying to configure this webhook in a code repository, with the end goal of having this function imported into pipeline builder (This would be the input for the rest of the pipeline). If not using a function I would like to have the webhook directly in pipeline builder to run every hour or so, and dump into a dataset.
I also have the transform-external-systems setting enabled from the documentation. From my understanding, I have to call the webhook, and also have an output as a dataset.
Could somebody tell me if I am on the right track with this code? I tried to mimic the documentation:
from palantir.datasets.core import Dataset
from palantir.datasets.webhooks import WebhookClient
from pyspark.sql import function
@function(sources=["Gdelt2Retrieve"])
def call_webhook() -> str:
# Create a WebhookClient instance
webhook_client = WebhookClient()
# Execute the webhook
try:
response = webhook_client.execute("Gdelt2")
except Exception as e:
return f"Error: Webhook call failed due to an exception: {e}"
# Check if the webhook execution was successful
if response.status_code != 200:
return f"Error: Webhook call failed with status code {response.status_code}, response: {response.text}"
# Process the response data
try:
data = response.json() # Extract JSON data
except ValueError:
return "Error:"
return str(data)
Also, after importing my API: ‘Gdelt2Retrieve’, In the resources side tab it gives me this starter code:
import requests
@function(sources=["Gdelt2Retrieve"])
def my_function() -> String:
# TODO: specify endpoint url
response = requests.get(...)
if response.status_code != 200:
# Handle error
data = response.json()
# Use response data
I don’t necessarily need to use the function, but I figured this would be the easiest way to get a webhook integrated into the pipeline builder.
Upvotes: 0
Views: 65
Reputation: 1379
There is no Webhook client available in Python functions (at the moment at least). You can however trigger the API calls with requests as if you were in any python script.
You can get inspiration from Source-based external transforms there: https://www.palantir.com/docs/foundry/data-integration/external-transforms-source-based
So, something like:
@function(sources=["NameOfThesource"])
def some_function(some_input: str) -> String:
source = get_source("NameOfThesource")
endpoint = source.get_https_connection().url
key = source.get_secret("NameOfTheSecret")
...
# Use the endpoint and the secret to perform some API call.
Upvotes: 1