JayBee
JayBee

Reputation: 145

Query oplog timestamp with spring mongo

I would like to query mongodbs oplog using Java and if possible the spring mongodb integration. My problem is to create the following query from java:

db['oplog.rs'].find({ "ts": { $gt: Timestamp(1489568405,34) }, $and: [ { "ns": "myns" } ] })

I've tried a few things like BsonTimestamp or BSONTimestamp which lead to wrong querys. Using

BasicQuery({ "ts": { $gt: Timestamp(1489568405,34) }, $and: [ { "ns": "myns" } ] }) 

lead to an error in the JSON parser of the java mongodb driver.

Any hints ?

Thx Jürgen

A typical record looks like this:

{ 
    "ts" : Timestamp(1489567144, 2), 
    "t" : NumberLong(2), 
    "h" : NumberLong(7303473893196954969), 
    "v" : NumberInt(2), 
    "op" : "i", 
    "ns" : "asda.jam", 
    "o" : {
        "_id" : NumberInt(2), 
        "time" : ISODate("2017-03-15T08:39:00.000+0000"), 
        "roadDesc" : {
            "roadId" : NumberInt(28102917), 
            "roadName" : "A480 W"
        }, 
        "posUpFront" : NumberInt(1003), 
        "posDownFront" : NumberInt(1003), 
        "_class" : "de.heuboe.acaJNI.test.Jam"
    }
}

Upvotes: 0

Views: 1826

Answers (2)

samo
samo

Reputation: 174

You can filter using org.bson.BsonTimestamp.

BsonTimestamp lastReadTimestamp = new BsonTimestamp(1489568405, 34);
Bson filter = new Document("$gt", lastReadTimestamp);

And then you can either use find, like this,

oplogColl.find(new Document("ts", filter));

Or you can create a bailable cursor and iterate through the documents like this,

MongoCursor oplogCursor =
                    oplogColl
                            .find(new Document("ts", filter))
                            .cursorType(CursorType.TailableAwait)
                            .noCursorTimeout(true)
                            .batchSize(1000)
                            .iterator();

Upvotes: 0

Raghavan
Raghavan

Reputation: 41

Mongo has an extended JSON syntax for constructs like NumberLong, Timestamp etc. which works on the Mongo shell. In order to make it work in Java code, they have a strict JSON mode where these operators are represented using JSON (https://docs.mongodb.com/manual/reference/mongodb-extended-json/#bson-data-types-and-associated-representations). To do this using Java you can create a custom converter and register it in your MappingMongoConverter (see snippet below). The converter should translate the data type (say BSONTimestamp) into the proper strict JSON document format.

@WritingConverter
public class BsonTimestampToDocumentConverter implements Converter<BSONTimestamp, Document> {

  private static final Logger LOGGER = LoggerFactory.getLogger(BsonTimestampToDocumentConverter.class);

  public BsonTimestampToDocumentConverter() {
    //
  }

  @Override
  public Document convert(BSONTimestamp source) {
    LOGGER.trace(">>>> Converting BSONTimestamp to Document");
    Document value = new Document();
    value.put("t", source.getTime());
    value.put("i", source.getInc());
    return new Document("$timestamp", value);
  }
}

Register it in the MappingMongoConverter like this

 public MappingMongoConverter syncLocalMappingMongoConverter() throws Exception {
    MongoMappingContext mappingContext = new MongoMappingContext();
    DbRefResolver dbRefResolver = new DefaultDbRefResolver(syncLocalDbFactory());
    MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mappingContext);
    converter.setCustomConversions(customConversions());

    return converter;
 }


  private CustomConversions customConversions()  {
   List<Converter<?, ?>> converterList = new ArrayList<>();
   converterList.add(new BsonTimestampToDocumentConverter());
   // add the other converters here
   return new CustomConversions(CustomConversions.StoreConversions.NONE, converterList);
}

Here's an example where I used it to query the oplog repository to return records after a certain time (the Sync in the repository was used to distinguish it from reactive async stuff I was working on. The Async repository looks exactly the same except it should extend ReactiveMongoRepository). The OplogRecord class is a Java bean that I created to match the structure of a MongoDb oplog record.

public interface SyncOplogRepository extends MongoRepository<OplogRecord, Long> {

  @Query(value = "{ \"op\": { $nin: ['n', 'c'] } }") List<OplogRecord> findRecordsNotEqualToNOrC();

  @Query(value = "{'ts' : {$gte : ?0}, \"op\": { $nin: ['n', 'c'] } }")
  List<OplogRecord> findRecordsNotEqualToNOrCAfterTime(BSONTimestamp timestamp);

  @Query(value = "{'ts' : {$lt : ?0}, \"op\": { $nin: ['n', 'c'] } }") 
  List<OplogRecord> findRecordsNotEqualToNOrCBeforeTime(BSONTimestamp timestamp);

}

OplogRecord class

import com.mongodb.DBObject;
import org.bson.BsonTimestamp;
import org.bson.types.BSONTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

import java.util.Map;


@Document(collection = "oplog.rs")
public class OplogRecord {

  @Id
  private Long id;

  /**
   * Timestamp
   */
  private BsonTimestamp ts;

  /**
   * Unique id for this entry
   */
  private Long h;

  /**
   * DB and collection name of change.
   */
  private String ns;

  /**
   * The actual document that was modified/inserted/deleted
   */
  private Map<String, Object> o;

  /**
   * The operation that was performed
   */
  private String op;

  /**
   * ??
   */
  private Long t;

  /**
   * ??
   */
  private Integer v;

  public BsonTimestamp getTs() {
    return ts;
  }

  public void setTs(BsonTimestamp ts) {
    this.ts = ts;
  }

  public Long getH() {
    return h;
  }

  public void setH(Long h) {
    this.h = h;
  }

  public String getNs() {
    return ns;
  }

  public void setNs(String ns) {
    this.ns = ns;
  }

  public Map<String, Object> getO() {
    return o;
  }

  public void setO(Map<String, Object> o) {
    this.o = o;
  }

  public String getOp() {
    return op;
  }

  public void setOp(String op) {
    this.op = op;
  }

  public Long getT() {
    return t;
  }

  public void setT(Long t) {
    this.t = t;
  }

  public Integer getV() {
    return v;
  }

  public void setV(Integer v) {
    this.v = v;
  }
}

~
~

Upvotes: 3

Related Questions