Jon
Jon

Reputation: 4967

Azure Databricks Spark SQL Query to CosmosDB getting columns from other documents

When using the Azure-Cosmosdb-spark connector to pass an sql query to CosmosDB, it seems to add columns from the other documents in the collection. There are a number of documents in the collection for example user and company which are separated by a entity type. For example the user is set up as

{   "id": "user-000003",
    "email": "someemail",
    "firstName": "firstname",
    "lastName": "lastname",
    "username": "someusername",
    "companyId": "company-000003",
    "entity": "user"
}

and the company is set up as:

{   "id": "company-000003",
    "contactName": "namegoes here",
    "addressLine1": "Address line 1",
    "addressLine2": "Address line 2",
    "entity": "company"
}

Using the Azure-Cosmosdb-spark sdk I create my connection

cosmosConfig = {
                "Endpoint" : "my endpoint goes here",
                "Masterkey" : "my key goes here",
                "Database" : "my database goes here",
                "preferredRegions" : "my region goes here",
                "Collection" : "my collection", 
                "SamplingRatio" : "1.0",
                "schema_samplesize" : "1000",
                "query_pagesize" : "2147483647",
               }

and then

set it up to use that connection

cosmosdbConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
cosmosdbConnection.createOrReplaceTempView("c")

I then run the query

exampleQuery= 'SELECT c.* FROM c WHERE c.entity = "user"'
users = spark.sql(exampleQuery)

I expected to get a dataframe with the columns, id, email, firstName, lastName, username, companyId and entity as defined in the user document. It is however pulling through the column names from the company document as well, but all null values. Running the same query in the Azure Cosmos DB storage explorer or azure portal, just brings back the user documents.

I can just specify the columns names I would like, but if the schema changes I'll need to add those columns.

I assume its the query? I'm looking for the way to just get the columns from the documents in the sql query. I did think that it would just pass the sql query to the cosmosdb SQL API.

This is my first time using databricks with cosmos db, have googled around, but can't seem to see what I've done wrong.

Upvotes: 0

Views: 1223

Answers (1)

Jon
Jon

Reputation: 4967

The problem is that the in the config set-up, a SQL Query on the collection hasn't been specified, it is just reading all the documents in the collection, rather than the distinct documents, that should be split by entity/theme types.

By adding SQL Query in the config

cosmosConfig = {
                "Endpoint" : "my endpoint goes here",
                "Masterkey" : "my key goes here",
                "Database" : "my database goes here",
                "preferredRegions" : "my region goes here",
                "Collection" : "my collection", 
                "SamplingRatio" : "1.0",
                "schema_samplesize" : "1000",
                "Query" : "SELECT * FROM c WHERE C.entity = 'SomeEntity"
                "query_pagesize" : "2147483647",
               }

The it will create a connections based on that. If you need to add multiple entities, create a function for example:

def createCosmosDBObject(useEntity):
  query = "SELECT * FROM c WHERE c.entity = " + "'" + useEntity + "'"
  # Create connection setting
  cosmosConfig = {
                "Endpoint" : "Kyour endpoint",
                "Masterkey" : "Your Key",
                "Database" : "Your Database",
                "preferredRegions" : "Azure Region",
                "Collection" : "Your Collection", 
                "ConnectionMode": "DirectHttps", 
                "SamplingRatio" : "1.0",
                "schema_samplesize" : "20000",
                "query_pagesize" : "2147483647",
                "query_custom" : query
               }

  createConnection = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**cosmosConfig).load()
  createConnection.createOrReplaceTempView(useEntity)

You can then call it via the document entity name:

createCosmosDBObject("customer")

It will then insert 'customer' into the query and then create a temp view that you can query, without overlap with the other document entity types in the Cosmos DB

Hope that helps

Upvotes: 0

Related Questions