Rafael
Rafael

Reputation: 671

Move JSON Data from DocumentDB (or CosmosDB) to Azure Data Lake

I have a lot of JSON files (in millions) in Cosmos DB (earlier called Document DB) and I want to move it into Azure Data Lake, for cold storage.

I found this document https://learn.microsoft.com/en-us/dotnet/api/microsoft.azure.documents.client.documentclient.readdocumentfeedasync?view=azure-dotnet but it doesnt have any samples to start with.

How should I proceed, any code samples are highly appreciated.

Thanks.

Upvotes: 2

Views: 3937

Answers (4)

Hauke Mallow
Hauke Mallow

Reputation: 3182

You also could read the change feed via Spark. Following python code example generates parquet files partitioned by loaddate for changed data. Works in an Azure Databricks notebooks on a daily schedule:

    # Get DB secrets
    endpoint = dbutils.preview.secret.get(scope = "cosmosdb", key = "endpoint")
    masterkey = dbutils.preview.secret.get(scope = "cosmosdb", key = "masterkey")

    # database & collection
    database = "<yourdatabase>"
    collection = "<yourcollection"

    # Configs
    dbConfig = {
    "Endpoint" : endpoint,
    "Masterkey" : masterkey,
    "Database" : database,
    "Collection" : collection, 
    "ReadChangeFeed" : "True",
    "ChangeFeedQueryName" : database + collection + " ",
    "ChangeFeedStartFromTheBeginning" : "False",
    "ChangeFeedUseNextToken" : "True",
    "RollingChangeFeed" : "False",
    "ChangeFeedCheckpointLocation" : "/tmp/changefeedcheckpointlocation",
    "SamplingRatio" : "1.0"
    }

    # Connect via Spark connector to create Spark DataFrame
    df = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**dbConfig).load()     

    # set partition to current date
    import datetime
    from pyspark.sql.functions import lit

    partition_day= datetime.date.today()
    partition_datetime=datetime.datetime.now().isoformat()

    # new dataframe with ingest date (=partition key)
    df_part= df.withColumn("ingest_date", lit(partition_day))

    # write parquet file
    df_part.write.partitionBy('ingest_date').mode('append').json('dir')

Upvotes: 1

Howard Edidin
Howard Edidin

Reputation: 63

You could also use a Logic App. A Timer trigger could be used. This would be a no-code solution

  1. Query for documents
  2. Loop though documents
  3. Add to Data Lake

The advantage is that you can apply any rules before sending to Data Lake

Upvotes: 0

Jay Gong
Jay Gong

Reputation: 23782

I suggest you using Azure Data Factory to implement your requirement.

Please refer to this doc about how to export json documents from cosmos db and this doc about how to import data into ADL.

Hope it helps you.


Update Answer:

Please refer to this : Azure Cosmos DB as source, you could create query in pipeline.

Upvotes: 2

Nick Chapsas
Nick Chapsas

Reputation: 7200

Yeah the change feed will do the trick.

You have two options. The first one (which is probably what you want in this case) is to use it via the SDK.

Microsoft has a detailed page on how to do with including code examples here: https://learn.microsoft.com/en-us/azure/cosmos-db/change-feed#rest-apis

The second one is the Change Feed Library which allows you to have a service running at all times listening for changes and processing them based on your needs. More details with code examples of the change feed library here: https://learn.microsoft.com/en-us/azure/cosmos-db/change-feed#change-feed-processor

(Both pages (which is the same really just different sections) contain a link to a Microsoft github repo which contains code examples.)

Keep in mind you will still be charged for using this in terms of RU/s but from what I've seems it is relatively low (or at least lower than what you'd pay of you start reading the collections themselves.)

Upvotes: 2

Related Questions