TechNewBie
TechNewBie

Reputation: 31

how to perform batch insertions in couchbase with latest java sdk 3.1.2

I was able to perform batch insertion with older version of java sdk like 2.9.7 with below code.

public void insertAll(Collection documents) {

Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
    @Override
    public Observable<JsonDocument> call(final JsonDocument docToInsert) {
        return couchbaseConfig.catalogBucket().async().insert(docToInsert)
                .doOnError((Throwable throwable) -> log.error(
                        "Exception {} occured while inerting document {} to cb", throwable.getMessage(),
                        docToInsert));
    }
}).last().toBlocking().single();

}

We have a requirement to upgrade to latest version of java sdk 3.1.2. I could not able to get much help with couchbase documentation. Any links on below are much appreciated

Upvotes: 2

Views: 1256

Answers (1)

dnault
dnault

Reputation: 8899

The Couchbase Java SDK 3.x documentation on Batching assumes familiarity with reactive programming, and Project Reactor in particular.

To update your code for SDK 3, you will need to:

  1. Supply your own JsonDocument class.
  2. Write to a Collection instead of a Bucket.
  3. Migrate from RxJava to Reactor (or use an adapter library).

Supply your own JsonDocument class

SDK 3 no longer requires or provides document classes like SDK 2's JsonDocument. You are free to model your document ID and content however you like. Here's a class you can use to aid your transition to SDK 3:

public class JsonDocument {
  private final String id;
  private final JsonObject content;

  public JsonDocument(String id, JsonDocument content) {
    this.id = id;
    this.content = content;
  }

  public String getId() { return id;}

  public JsonObject getContent() { return content;}

  @Override
  public String toString() {
    return "JsonDocument{id='" + id + "', content=" + content + "}";
  }
} 

Write to a Collection instead of a Bucket

In SDK 3, each bucket has one or more "scopes". Each scope contains one or more "collections". Scopes help with multitenancy; each customer or deployment could be assigned its own scope. Collections help you organize your documents; you could put widgets and invoices in separate collections, for example.

Each bucket has a default scope, and that scope has a default collection.

Support for scopes and collections is coming in Couchbase Server 7, but SDK 3 asks you to think about them now. All of the get/insert/remove/etc. methods that used to be part of the Bucket class have moved to the Collection class.

Fortunately there's a convenience method for accessing a bucket's default collection.

What does this mean for your code? In SDK 2 you had:

AsyncBucket catalog = couchbaseConfig.catalogBucket().async();

In SDK 3 you would write:

ReactiveCollection catalog = couchbaseConfig.catalogBucket()
    .defaultCollection()
    .reactive();

Migrate from RxJava to Reactor

As you're probably aware, the Couchbase Java SDK 2.x used RxJava for its reactive model. SDK 3 uses Reactor instead. The concepts are mostly the same, but the names of the reactive primitives are different:

Obseravble<T> -> Flux<T>

Single<T> -> Mono<T>

Completable -> Mono<Void>

The Reactor reference documentation is your friend here.

Putting it all together

Assuming documents is a list of the JsonDocument class mentioned above, here's what your code might look like in SDK 3:

ReactiveCollection destCollection = couchbaseConfig.catalogBucket()
    .defaultCollection()
    .reactive();

Flux.fromIterable(documents)
    .flatMap(docToInsert ->
        destCollection.insert(docToInsert.getId(), docToInsert.getContent())
            .doOnError(throwable -> log.error(
                "Exception {} occurred while inserting document {} to cb",
                throwable.getMessage(), docToInsert)))
    .blockLast();

An issue with this code (as well as the SDK 2 version) is that if any insertion fails, the remaining documents will not be inserted. If you want to continue inserting the other documents, you can use onErrorResume() instead of doOnError():

Flux.fromIterable(documents)
    .flatMap(docToInsert ->
        destCollection.insert(docToInsert.getId(), docToInsert.getContent())
            .onErrorResume(throwable -> {
              log.error("Exception {} occurred while inserting document {} to cb",
                  throwable.getMessage(), docToInsert);
              return Mono.empty();
            }))
    .blockLast();

Upvotes: 4

Related Questions