Harmeet Singh Taara
Harmeet Singh Taara

Reputation: 6611

Reactive Mongo: Trouble For Sync Multiple Queries In Reactive Mongo

I am using an Reactive Mongo with Scala and Play-Framework. I need to hitmultiple queries for update the result and retrieve the updated result. But in reactive our all queries are work in different threads, some time i need to sync the queries. How this is possible using Reactive Mongo. Follwoing is my code:

def videoDetails(videoId: String) = Action.async{
logger.info("In videoDetails controller method");

var videoIds: List[JsObject] = CustomUtility.convertStringIdToJsonCriteria(List[String]{videoId});
var query = Json.obj("_id" -> Json.obj("$in" -> videoIds));
var cursor: Cursor[Video] = videosCollection.find(query).cursor[Video];
var future: Future[Option[Video]] = cursor.headOption;
var hitCount: Future[Int] = future.map { videos => {
  if(!videos.isEmpty){
    videos.get.hitCount.get
  }else {
    0;
  }
}}
hitCount.flatMap { count => {
  videosCollection.update(query, Json.obj("$set" -> Json.obj("hitCount" -> (count+1)))).map{ lastError =>
      logger.debug(s"Successfully updated with LastError: $lastError");
    }
  var cursor: Cursor[Video] = videosCollection.find(query).cursor[Video];
  var future: Future[Option[Video]] = cursor.headOption;
  future.map { option => {
   if(!option.isEmpty) Ok(Json.toJson(option.get)) else Ok(Json.toJson(""))
  }}
}}
}

In above code, firstly i am fetch the record hit count using query, and in future map, create the update query for update my hit count of records. After the record is updated, Need to fetch latest detail for docuemnt. But accortding to above code. In every hit to the controller, that data is not consist. Because some, if all threads run one after another then the resule is good other wise, result is not right, because all threads run asyn. How can we process all thread in sync?

In Reactie Mongo, for create DTO is also a problem, some time data comes but some times not.

UPDATE

From the answer by @cchantep i update my code as below:

def videoDetails(videoId: String) = Action.async{
logger.info("In videoDetails controller method");

var videoIds: List[JsObject] = CustomUtility.convertStringIdToJsonCriteria(List[String]{videoId});
var query = Json.obj("_id" -> Json.obj("$in" -> videoIds));
var cursor: Cursor[Video] = videosCollection.find(query).cursor[Video];
for{
  hitCount <- cursor.headOption
  _ <- Promise.successful(hitCount.map { video => {
        videosCollection.update(query, Json.obj("$set" -> Json.obj("hitCount" -> (video.hitCount.get+1)))).map{ lastError =>
            logger.debug(s"Successfully updated with LastError: $lastError");
          }
      }}).future
  resultVideo <- cursor.headOption
  result <- Promise.successful(resultVideo.map { video => {
              Ok(Json.toJson(video))
            }}).future
}yield result.getOrElse(NotFound)
}

But still the data is not sync. some time i return the chages in video object and some time not.

Upvotes: 0

Views: 336

Answers (1)

cchantep
cchantep

Reputation: 9168

You can sequence the Mongo operations using for-comprehension.

for {
  a <- colA.find(...)
  b <- colB.find(...)
  _ <- colB.update(..., ...)
  c <- colB.find(...)//find if update ok
} yield c

Upvotes: 1

Related Questions