Reputation: 29
I am having a problem with mongodb (version 4.2), specifically with the Change Stream functionality.
I have a ReplicaSet cluster consisting of 1 primary, 1 secondary and 1 Arbiter and, in my java code with API mongodb-driver-sync 4.2.0-beta1, I have a .watch() process on a collection of interest. Like below:
MongoClient mongoClient = MongoClients.create("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=replica");
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collectionStream = database.getCollection("myCollection");
List<Bson> pipeline = Arrays.asList(Aggregates.match(Filters.and(Filters.in("operationType", Arrays.asList("insert", "update", "replace", "invalidate")))));
MongoCursor<ChangeStreamDocument<Document>> cursor = collectionStream.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
ChangeStreamDocument<Document> streamedEvent = cursor.next();
System.out.println("Streamed event: " + streamedEvent);
Basically, the stream works fine. When an insert/update operation occurs, the event is recognized and the document is streamed correctly. However, when one of the two nodes (either the primary or the secondary, a data-bearing one) goes down, the watcher stops streaming anything. Update/insert operations continue fine on database but the stream is blocked. As soon as I restart one of the two nodes, the stream immediately resumes correctly and also shows me the events not streamed previously. If, on the other hand, the arbiter node goes down, the stream keeps working fine.
Below my rs.conf() file and, as you can see, the WriteConcern parameter is 1.
{
"_id" : "replica",
"version" : 31,
"protocolVersion" : NumberLong(1),
"writeConcernMajorityJournalDefault" : true,
"members" : [
{
"_id" : 0,
"host" : "host1:27017",
"arbiterOnly" : false,
"buildIndexes" : true,
"hidden" : false,
"priority" : 1000,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
},
{
"_id" : 1,
"host" : "host2:27017",
"arbiterOnly" : false,
"buildIndexes" : true,
"hidden" : false,
"priority" : 1,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
},
{
"_id" : 4,
"host" : "host2:27018",
"arbiterOnly" : true,
"buildIndexes" : true,
"hidden" : false,
"priority" : 0,
"tags" : {
},
"slaveDelay" : NumberLong(0),
"votes" : 1
}
],
"settings" : {
"chainingAllowed" : true,
"heartbeatIntervalMillis" : 500,
"heartbeatTimeoutSecs" : 3,
"electionTimeoutMillis" : 3000,
"catchUpTimeoutMillis" : -1,
"catchUpTakeoverDelayMillis" : 30000,
"getLastErrorModes" : {
},
"getLastErrorDefaults" : {
"w" : 1,
"j" : false,
"wtimeout" : 0
},
"replicaSetId" : ObjectId("5f16a4e1e1c622bbea578576")
}}
Can anyone help me to solve this problem?
Updates: To solve this behavior, in every .conf file of each node I set replication.enableMajorityReadConcern equals to false, in order to disable the ReadConcernMajority. Anyway, following this setting, by stopping one of the primary or secondary node, I am always getting the following exception in console:
Exception in thread "main" com.mongodb.MongoExecutionTimeoutException: Error waiting for snapshot not less than { ts: Timestamp(1605805914, 1), t: -1 }, current relevant optime is { ts: Timestamp(1605805864, 1), t: 71 }. :: caused by :: operation exceeded time limit
at com.mongodb.internal.connection.ProtocolHelper.createSpecialException(ProtocolHelper.java:239)
at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:171)
at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:359)
at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:280)
at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:100)
at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:490)
at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:259)
at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:202)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:118)
at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:110)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:345)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:336)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection(CommandOperationHelper.java:222)
at com.mongodb.internal.operation.CommandOperationHelper$5.call(CommandOperationHelper.java:208)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.CommandOperationHelper.executeCommand(CommandOperationHelper.java:205)
at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:189)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:325)
at com.mongodb.internal.operation.ChangeStreamOperation$1.call(ChangeStreamOperation.java:321)
at com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:321)
at com.mongodb.internal.operation.ChangeStreamOperation.execute(ChangeStreamOperation.java:60)
at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:178)
at com.mongodb.client.internal.ChangeStreamIterableImpl.execute(ChangeStreamIterableImpl.java:204)
at com.mongodb.client.internal.ChangeStreamIterableImpl.cursor(ChangeStreamIterableImpl.java:158)
at com.mongodb.client.internal.ChangeStreamIterableImpl.iterator(ChangeStreamIterableImpl.java:153)
at com.softstrategy.ProvaWatcher.ProvaWatcherApplication.main(ProvaWatcherApplication.java:34)
On the other hand, if I comment out the enableMajorityReadConcern in the every file .conf nodes as it is by default, that exception does not appear.
Hence my questions are the following two ones:
Thanks!
Upvotes: 2
Views: 1855
Reputation: 14520
With a PSA architecture, if either of the data bearing nodes is unavailable, you no longer have a majority of data bearing nodes present. Meaning, you would be able to insert with w:1 but not w:majority and you wouldn't be able to perform majority reads.
Change streams per https://docs.mongodb.com/manual/reference/read-concern-majority/#disable-read-concern-majority use majority read concern:
Disabling "majority" read concern disables support for Change Streams for MongoDB 4.0 and earlier. For MongoDB 4.2+, disabling read concern "majority" has no effect on change streams availability.
This is also implied by https://www.mongodb.com/blog/post/an-introduction-to-change-streams given
Total ordering
MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster. Applications will always receive changes in the order they were applied to the database.
The total ordering is only possible with a majority read.
Use PSS architecture if you wish change streams to continue producing events when one of the data bearing nodes is unavailable.
You can also try disabling read concern majority on 4.2+ but this has other issues as described in the first link.
Upvotes: 2