Omkar Khair
Omkar Khair

Reputation: 1384

Cosmos DB spatial query using Spark

I would like to query a cosmos db collection using a spatial query. Specifically the ST_DISTANCE query. This query works as intended using the azure-cosmos Python SDK.

I am looking to use this query via Apache Spark for a more complex query pattern. However, using the ST_DISTANCE query in a SQL cell in a notebook results in the following error.

Error in SQL statement: AnalysisException: Undefined function: 'ST_DISTANCE'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.

The notebook is initialized as follows.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.createOrReplaceTempView("outlets")

_______________________________________________________________________

%sql
SELECT * FROM outlets f WHERE ST_DISTANCE(f.boundary, POINT(0,0)) < 600

Upvotes: 0

Views: 1725

Answers (1)

Omkar Khair
Omkar Khair

Reputation: 1384

Based on what I understand from the Cosmos DB Spark connector github repo[1], not all Cosmos DB filter queries are supported via the connector (yet?). So the ST_DISTANCE and other filter functions in the spatial family aren't going to work as those aren't predicates that are natively supported by Spark to be pushed down to the database.

Found something that will help sail past this issue at least temporarily. The query config[2] allows sending a custom query directly to Cosmos DB. A temporary view can be built and queried over. This will not work for all use cases, but this solved my issue where I need a single view with distance filtering done. Rest can be handled via Spark SQL.

Refer spark.cosmos.read.customQuery[2] in below sample.

outlets_cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
  "spark.cosmos.read.customQuery" : "SELECT * FROM c WHERE ST_DISTANCE(c.location,{\"type\":\"Point\",\"coordinates\": [12.832489, 18.9553242]}) < 1000"
}

df = spark.read.format("cosmos.oltp").options(**outlets_cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.createOrReplaceTempView("outlets")

[1] https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/

[2] https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#query-config

Upvotes: 1

Related Questions