Shiva subedi
Shiva subedi

Reputation: 11

Spring data mongo resume token for MessageListenerContainer

Hi I am trying to implement MessageListener from mongo oplog and it should retrieve the document from last stopped streaming document. Currently I have the code set up as below. Not sure how to retrieve resumeToken from last document and set it so that if listener application goes down and comes back online, it should read after the last read.

    @Bean
    MessageListenerContainer candidateMessageListenerContainer(MongoTemplate mongoTemplate, @Qualifier("candidateMessageListener") MessageListener documentMessageListener)
    {
    Executor executor = Executors.newSingleThreadExecutor();
    MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, executor)
    {
        @Override
        public boolean isAutoStartup()
        {
        return true;
        }
    };
    ChangeStreamRequest<Candidate> request = ChangeStreamRequest.builder(documentMessageListener)
                    .collection("candidate") // The name of the collection to listen to , Do not specify the default listening database
                    .filter(newAggregation(match(where("operationType").in("insert", "update", "replace", "delete")))) // Filter the types of operations that need to be monitored , You can specify filter conditions according to your needs
                    .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                    // When not set , When the document is updated , Only information about the changed fields will be sent , Set up UPDATE_LOOKUP All information of the document will be returned
                    .build();
    messageListenerContainer.register(request, Candidate.class);
    return messageListenerContainer;
    }

Upvotes: 1

Views: 660

Answers (1)

Christoph Strobl
Christoph Strobl

Reputation: 6736

You can use the getResumeToken() or getTimestamp() methods on ChangeStreamEvent to obtain this information. See the Reference Documentation - Resuming Change Streams for details.

Upvotes: 1

Related Questions