Gero
Gero

Reputation: 2967

Azure Databricks: create audit trail for who ran what query at what moment

We have an audit requirement to provide insight into who executed what query at what moment in Azure Databricks. The Azure Databricks / Spark UI / Jobs tab already lists the Spark jobs executed including the query done and the time it was submitted. But it does not include who executed the query.

  1. Is there an API we can use with Azure Databricks to query these Spark job details shown in the UI? (The Databricks REST API does not seem to provide this, but maybe I'm overlooking something)
  2. Is there a way we can determine who created the Spark job (using an API)

Thanks, Gero

Upvotes: 2

Views: 907

Answers (2)

nenetto
nenetto

Reputation: 441

Following the answer from Douglas I came up with this function to use inside Databricks Notebooks and get some information about the cached RDDs:

I hope it helps. I was looking information about 2h on this topic.

def get_databricks_rdd_info():

    import requests, json

    # Get Spark Context
    sc = spark.sparkContext
    # Get App Id (Notebook is attached to it)
    app_id = sc._jsc.sc().applicationId()
    # Where is my driver
    driver_ip = spark.conf.get('spark.driver.host')
    port = spark.conf.get("spark.ui.port")
    # Compose the query to the Spark UI API
    url = f"http://{driver_ip}:{port}/api/v1/applications/{app_id}/storage/rdd"

    # Make request
    r = requests.get(url, timeout=3.0)

    if r.status_code == 200:
        # Compose results
        df = spark.createDataFrame([json.dumps(r) for r in r.json()], T.StringType())
        json_schema = spark.read.json(df.rdd.map(lambda row: row.value)).schema
        df = df.withColumn('value', F.from_json(F.col('value'), json_schema))
        df = df.selectExpr('value.*')
        
        # Generate summary
        df_summary = (df
                      .withColumn('Name', F.element_at(F.split(F.col('name'), ' '), -1))
                      .withColumn('Cached', F.round(F.lit(100) * F.col('numCachedPartitions')/F.col('numPartitions'), 2))
                      .withColumn('Memory GB', F.round(F.col('memoryUsed')*1e-9, 2))
                      .withColumn('Disk GB', F.round(F.col('diskUsed')*1e-9, 2))
                      .withColumnRenamed('numPartitions', '# Partitions')

                      .select([
                          'Name',
                          'id',
                          'Cached',
                          'Memory GB',
                          'Disk GB',
                          '# Partitions',
                      ]))
    else:
        print('Some error happened, code:', r.status_code)
        df = None
        df_summary = None
        
        
    return df, df_summary


You can use it inside Databricks Notebooks as:

df_rdd_info, df_summary = get_databricks_rdd_info()
display(df_summary)

Upvotes: 0

Douglas M
Douglas M

Reputation: 1126

1. Access the Spark API

a. Driver node (internal) access the Azure Databricks Spark api:

import requests

driverIp = spark.conf.get('spark.driver.host')
port = spark.conf.get("spark.ui.port")
url = F"http://{driverIp}:{port}/api/v1/applications"
r = requests.get(url, timeout=3.0)
r.status_code, r.text

If for example you received this error message from public API: PERMISSION_DENIED: Traffic on this port is not permitted

b. External Access to the Azure Databricks Spark API:

import requests
import json
"""
  Program access to Databricks Spark UI. 
  
  Works external to Databricks environment or running within.
  Requires a Personal Access Token. Treat this like a password, do not store in a notebook. Please refer to the Secrets API.
  This Python code requires F string support.

"""

# https://<databricks-host>/driver-proxy-api/o/0/<cluster_id>/<port>/api/v1/applications/<application-id-from-master-spark-ui>/stages/<stage-id>
port = spark.conf.get("spark.ui.port")
clusterId = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
host = "eastus2.azuredatabricks.net"
workspaceId = "999999999999111"  # follows the 'o=' in the databricks URLs or zero
token = "dapideedeadbeefdeadbeefdeadbeef68ee3"  # Personal Access token

url = F"https://{host}/driver-proxy-api/o/{workspaceId}/{clusterId}/{port}/api/v1/applications/?status=running"
r = requests.get(url, auth=("token", token))

# print Application list response
print(r.status_code, r.text)

applicationId = r.json()[0]['id'] # assumes only one response

url = F"https://{host}/driver-proxy-api/o/{workspaceId}/{clusterId}/{port}/api/v1/applications/{applicationId}/jobs"
r = requests.get(url, auth=("token", token))

print(r.status_code, r.json())

2. Sorry, no, not at this time.

The cluster logs would be where you'd look, but the user identity is not there.

To vote and track this idea: https://ideas.databricks.com/ideas/DBE-I-313 How to get to the Ideas portal: https://docs.databricks.com/ideas.html

Upvotes: 2

Related Questions