Reputation: 145
I would like to query mongodbs oplog using Java and if possible the spring mongodb integration. My problem is to create the following query from java:
db['oplog.rs'].find({ "ts": { $gt: Timestamp(1489568405,34) }, $and: [ { "ns": "myns" } ] })
I've tried a few things like BsonTimestamp or BSONTimestamp which lead to wrong querys. Using
BasicQuery({ "ts": { $gt: Timestamp(1489568405,34) }, $and: [ { "ns": "myns" } ] })
lead to an error in the JSON parser of the java mongodb driver.
Any hints ?
Thx Jürgen
A typical record looks like this:
{
"ts" : Timestamp(1489567144, 2),
"t" : NumberLong(2),
"h" : NumberLong(7303473893196954969),
"v" : NumberInt(2),
"op" : "i",
"ns" : "asda.jam",
"o" : {
"_id" : NumberInt(2),
"time" : ISODate("2017-03-15T08:39:00.000+0000"),
"roadDesc" : {
"roadId" : NumberInt(28102917),
"roadName" : "A480 W"
},
"posUpFront" : NumberInt(1003),
"posDownFront" : NumberInt(1003),
"_class" : "de.heuboe.acaJNI.test.Jam"
}
}
Upvotes: 0
Views: 1826
Reputation: 174
You can filter using org.bson.BsonTimestamp.
BsonTimestamp lastReadTimestamp = new BsonTimestamp(1489568405, 34);
Bson filter = new Document("$gt", lastReadTimestamp);
And then you can either use find, like this,
oplogColl.find(new Document("ts", filter));
Or you can create a bailable cursor and iterate through the documents like this,
MongoCursor oplogCursor =
oplogColl
.find(new Document("ts", filter))
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true)
.batchSize(1000)
.iterator();
Upvotes: 0
Reputation: 41
Mongo has an extended JSON syntax for constructs like NumberLong, Timestamp etc. which works on the Mongo shell. In order to make it work in Java code, they have a strict JSON mode where these operators are represented using JSON (https://docs.mongodb.com/manual/reference/mongodb-extended-json/#bson-data-types-and-associated-representations). To do this using Java you can create a custom converter and register it in your MappingMongoConverter (see snippet below). The converter should translate the data type (say BSONTimestamp) into the proper strict JSON document format.
@WritingConverter
public class BsonTimestampToDocumentConverter implements Converter<BSONTimestamp, Document> {
private static final Logger LOGGER = LoggerFactory.getLogger(BsonTimestampToDocumentConverter.class);
public BsonTimestampToDocumentConverter() {
//
}
@Override
public Document convert(BSONTimestamp source) {
LOGGER.trace(">>>> Converting BSONTimestamp to Document");
Document value = new Document();
value.put("t", source.getTime());
value.put("i", source.getInc());
return new Document("$timestamp", value);
}
}
Register it in the MappingMongoConverter like this
public MappingMongoConverter syncLocalMappingMongoConverter() throws Exception {
MongoMappingContext mappingContext = new MongoMappingContext();
DbRefResolver dbRefResolver = new DefaultDbRefResolver(syncLocalDbFactory());
MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mappingContext);
converter.setCustomConversions(customConversions());
return converter;
}
private CustomConversions customConversions() {
List<Converter<?, ?>> converterList = new ArrayList<>();
converterList.add(new BsonTimestampToDocumentConverter());
// add the other converters here
return new CustomConversions(CustomConversions.StoreConversions.NONE, converterList);
}
Here's an example where I used it to query the oplog repository to return records after a certain time (the Sync in the repository was used to distinguish it from reactive async stuff I was working on. The Async repository looks exactly the same except it should extend ReactiveMongoRepository). The OplogRecord class is a Java bean that I created to match the structure of a MongoDb oplog record.
public interface SyncOplogRepository extends MongoRepository<OplogRecord, Long> {
@Query(value = "{ \"op\": { $nin: ['n', 'c'] } }") List<OplogRecord> findRecordsNotEqualToNOrC();
@Query(value = "{'ts' : {$gte : ?0}, \"op\": { $nin: ['n', 'c'] } }")
List<OplogRecord> findRecordsNotEqualToNOrCAfterTime(BSONTimestamp timestamp);
@Query(value = "{'ts' : {$lt : ?0}, \"op\": { $nin: ['n', 'c'] } }")
List<OplogRecord> findRecordsNotEqualToNOrCBeforeTime(BSONTimestamp timestamp);
}
OplogRecord class
import com.mongodb.DBObject;
import org.bson.BsonTimestamp;
import org.bson.types.BSONTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import java.util.Map;
@Document(collection = "oplog.rs")
public class OplogRecord {
@Id
private Long id;
/**
* Timestamp
*/
private BsonTimestamp ts;
/**
* Unique id for this entry
*/
private Long h;
/**
* DB and collection name of change.
*/
private String ns;
/**
* The actual document that was modified/inserted/deleted
*/
private Map<String, Object> o;
/**
* The operation that was performed
*/
private String op;
/**
* ??
*/
private Long t;
/**
* ??
*/
private Integer v;
public BsonTimestamp getTs() {
return ts;
}
public void setTs(BsonTimestamp ts) {
this.ts = ts;
}
public Long getH() {
return h;
}
public void setH(Long h) {
this.h = h;
}
public String getNs() {
return ns;
}
public void setNs(String ns) {
this.ns = ns;
}
public Map<String, Object> getO() {
return o;
}
public void setO(Map<String, Object> o) {
this.o = o;
}
public String getOp() {
return op;
}
public void setOp(String op) {
this.op = op;
}
public Long getT() {
return t;
}
public void setT(Long t) {
this.t = t;
}
public Integer getV() {
return v;
}
public void setV(Integer v) {
this.v = v;
}
}
~
~
Upvotes: 3