blue-sky
blue-sky

Reputation: 53856

Understanding implicits and subscribe in mongodb Scala src

I'm trying to get a better understanding of the Scala MongoDB src

Using the scala mongodb driver (api doc : http://mongodb.github.io/mongo-scala-driver/)

when I use

  val collection: MongoCollection[Document] = database.getCollection("mycollection");

      val observable: Observable[Completed] = collection.insertOne(doc)


      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println("Failed")
        override def onComplete(): Unit = println("Completed")
      })

Is this implicit method

/**
     * Subscribes to the [[Observable]] and requests `Long.MaxValue`.
     *
     * Uses the default or overridden `onNext`, `onError`, `onComplete` partial functions.
     *
     * @param doOnNext anonymous function to apply to each emitted element.
     * @param doOnError anonymous function to apply if there is an error.
     * @param doOnComplete anonymous function to apply on completion.
     */
    def subscribe(doOnNext: T => Any, doOnError: Throwable => Any, doOnComplete: () => Any): Unit = {
      observable.subscribe(new Observer[T] {
        override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue)

        override def onNext(tResult: T): Unit = doOnNext(tResult)

        override def onError(throwable: Throwable): Unit = doOnError(throwable)

        override def onComplete(): Unit = doOnComplete()

      })
    }

src : https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/ObservableImplicits.scala

called from :

 /**
   * Request `Observable` to start streaming data.
   *
   * This is a "factory method" and can be called multiple times, each time starting a new [[Subscription]].
   * Each `Subscription` will work for only a single [[Observer]].
   *
   * If the `Observable` rejects the subscription attempt or otherwise fails it will signal the error via [[Observer.onError]].
   *
   * @param observer the `Observer` that will consume signals from this `Observable`
   */
  def subscribe(observer: Observer[_ >: T]): Unit

src : https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/Observable.scala

It seems that calling subscribe invoked a new thread (as it's called subscribe) but I don't see where this new thread is called from the src?

Implicits are used to achieve this 'wiring' that invokes the implicit subscribe method when I use observable.subscribe(new Observer[Completed] {.... ?

Update :

Using this code :

import org.mongodb.scala.MongoClient;
import org.mongodb.scala.bson.collection.immutable.Document;
import org.mongodb.scala._
import org.scalatest._
import Matchers._
import org.mongodb.scala._

class MongoSpec extends FlatSpec with Matchers {

  "Test MongoDb" should "insert" in {
    {
      val mongoClient: MongoClient = MongoClient()
      val database: MongoDatabase = mongoClient.getDatabase("scala-poc");

      val doc: Document = Document("_id" -> 6, "name" -> "MongoDB", "type" -> "database",
        "count" -> 1, "info" -> Document("x" -> 203, "y" -> 100))

      val collection: MongoCollection[Document] = database.getCollection("documents");

      val observable: Observable[Completed] = collection.insertOne(doc)

      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println(" \n\nFailed " + e + "\n\n")
        override def onComplete(): Unit = println("Completed")
      })

      mongoClient.close();

    }

  }
}

causes below exception :

Failed com.mongodb.MongoClientException: Shutdown in progress

The mongoClient.close(); is being invoked before insertOne method is completed.

So insertOne or subscribe method is asynchronous ?

Upvotes: 2

Views: 2229

Answers (1)

Alexey Romanov
Alexey Romanov

Reputation: 170805

  1. No, subscribe(doOnNext, doOnError, doOnComplete) calls subscribe(observer) (as you can see from the implementation quoted in your question). So if it was also called from there, you'd get an infinite loop. The "wiring" is used when you write something like observer.subscribe(x => println(s"next = $x"), error => error.printStackTrace(), () => {}).

  2. No, subscribe doesn't create a new thread. Classes implementing Observable mostly wrap classes from the Java MongoDB driver and call their own subscribe methods, e.g. override def subscribe(observer: Observer[_ >: TResult]): Unit = observe(wrapped).subscribe(observer). These subscribe methods also don't start new threads: see https://mongodb.github.io/mongo-java-driver/3.1/driver-async/reference/observables/ for some explanation.

Upvotes: 1

Related Questions