lucuma
lucuma

Reputation: 18339

DocumentDB Change Feed and saving Checkpoint

After reading the documentation, I'm having a hard time conceptualizing the change feed. Let's take the code from the documentation below. The second change feed is picking up the changes from the last time it was run via the checkpoints. Let's say it is being used to create summary data and there was an issue and it needed to be re-run from a prior time. I don't understand the following:

Code that runs from collection beginning and then from last checkpoint:

   Dictionary < string, string > checkpoints = await GetChanges(client, collection, new Dictionary < string, string > ());

  await client.CreateDocumentAsync(collection, new DeviceReading {
   DeviceId = "xsensr-201", MetricType = "Temperature", Unit = "Celsius", MetricValue = 1000
  });
  await client.CreateDocumentAsync(collection, new DeviceReading {
   DeviceId = "xsensr-212", MetricType = "Pressure", Unit = "psi", MetricValue = 1000
  });

  // Returns only the two documents created above.
  checkpoints = await GetChanges(client, collection, checkpoints);
  //

  private async Task < Dictionary < string, string >> GetChanges(
   DocumentClient client,
   string collection,
   Dictionary < string, string > checkpoints) {
   List < PartitionKeyRange > partitionKeyRanges = new List < PartitionKeyRange > ();
   FeedResponse < PartitionKeyRange > pkRangesResponse;

   do {
    pkRangesResponse = await client.ReadPartitionKeyRangeFeedAsync(collection);
    partitionKeyRanges.AddRange(pkRangesResponse);
   }
   while (pkRangesResponse.ResponseContinuation != null);

   foreach(PartitionKeyRange pkRange in partitionKeyRanges) {
    string continuation = null;
    checkpoints.TryGetValue(pkRange.Id, out continuation);

    IDocumentQuery < Document > query = client.CreateDocumentChangeFeedQuery(
     collection,
     new ChangeFeedOptions {
      PartitionKeyRangeId = pkRange.Id,
       StartFromBeginning = true,
       RequestContinuation = continuation,
       MaxItemCount = 1
     });

    while (query.HasMoreResults) {
     FeedResponse < DeviceReading > readChangesResponse = query.ExecuteNextAsync < DeviceReading > ().Result;

     foreach(DeviceReading changedDocument in readChangesResponse) {
      Console.WriteLine(changedDocument.Id);
     }

     checkpoints[pkRange.Id] = readChangesResponse.ResponseContinuation;
    }
   }

   return checkpoints;
  }

Upvotes: 0

Views: 1147

Answers (2)

Aravind Krishna R.
Aravind Krishna R.

Reputation: 8003

DocumentDB supports check-pointing only by the logical timestamp returned by the server. If you would like to retrieve all changes from X minutes ago, you would have to "remember" the logical timestamp corresponding to the clock time (ETag returned for the collection in the REST API, ResponseContinuation in the SDK), then use that to retrieve changes.

Change feed uses logical time in place of clock time because it can be different across various servers/partitions. If you would like to see change feed support based on clock time (with some caveats on skew), please propose/upvote at https://feedback.azure.com/forums/263030-documentdb/.

To save the last checkpoint per partition key/document, you can just save the corresponding version of the batch in which it was last seen (ETag returned for the collection in the REST API, ResponseContinuation in the SDK), like Fred suggested in his answer.

Upvotes: 1

Fei Han
Fei Han

Reputation: 27793

How to specify a particular time the checkpoint should start.

You could try to provide a logical version/ETag (such as 95488) instead of providing a null value as RequestContinuation property of ChangeFeedOptions.

Upvotes: 1

Related Questions