Tdawg90
Tdawg90

Reputation: 117

Creating multiple CosmosDB Changefeeds programmatically

I have multiple CosmosDB collections I need to monitor and write to an ADLS location.

I'm trying to create a feed factory where foreach ChangeFeed Config, spin up a new Change Feed to listen and process. The observer has a very explicit signature which I don't know how to include additional context, eg process documents of 'this' type.
Because of that, I have different observer of each type, A Observer, B Observer, ect.

Though for this I need to pass in a reference of the associated observer based on the config to the ChangeFeedProcesserBuilder(), however I get "'T' must be a non-abstract type with a public.....'

I'm kind of at a loss other than hardcoding each instance of the FeedChange

    private async Task CreateChangeFeed<T>(DocumentCollectionInfo feedCollectionInfo, DocumentCollectionInfo leaseCollectionInfo, ChangeFeedProcessorOptions options) where T : Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver
    {
        var processor = await new ChangeFeedProcessorBuilder()
               .WithHostName("CosmosDBDetectorHost")
               .WithFeedCollection(feedCollectionInfo)
               .WithLeaseCollection(leaseCollectionInfo)
               .WithProcessorOptions(new ChangeFeedProcessorOptions()
               {
                   FeedPollDelay = new TimeSpan(0, ChangeFeedOptionsConstants.MinutesInterval, 0),
                   MaxItemCount = ChangeFeedOptionsConstants.MaxItemCount,
               })
               .WithObserver<T>()
               .BuildAsync();
    }

Upvotes: 0

Views: 192

Answers (1)

Matias Quaranta
Matias Quaranta

Reputation: 15613

Your T needs to have a parameterless constructor and your Function signature should explicitly state it like so (notice the new() on the signature):

private async Task CreateChangeFeed<T>(
    DocumentCollectionInfo feedCollectionInfo, 
    DocumentCollectionInfo leaseCollectionInfo, 
    ChangeFeedProcessorOptions options) 
    where T : Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing.IChangeFeedObserver, new ()
{

    var processor = await new ChangeFeedProcessorBuilder()
           .WithHostName("CosmosDBDetectorHost")
           .WithFeedCollection(feedCollectionInfo)
           .WithLeaseCollection(leaseCollectionInfo)
           .WithProcessorOptions(new ChangeFeedProcessorOptions()
           {
               FeedPollDelay = new TimeSpan(0, ChangeFeedOptionsConstants.MinutesInterval, 0),
               MaxItemCount = ChangeFeedOptionsConstants.MaxItemCount,
           })
           .WithObserver<T>()
           .BuildAsync();
}

Upvotes: 1

Related Questions