Reputation: 31
I was able to perform batch insertion with older version of java sdk like 2.9.7 with below code.
public void insertAll(Collection documents) {
Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(final JsonDocument docToInsert) {
return couchbaseConfig.catalogBucket().async().insert(docToInsert)
.doOnError((Throwable throwable) -> log.error(
"Exception {} occured while inerting document {} to cb", throwable.getMessage(),
docToInsert));
}
}).last().toBlocking().single();
}
We have a requirement to upgrade to latest version of java sdk 3.1.2. I could not able to get much help with couchbase documentation. Any links on below are much appreciated
Upvotes: 2
Views: 1256
Reputation: 8899
The Couchbase Java SDK 3.x documentation on Batching assumes familiarity with reactive programming, and Project Reactor in particular.
To update your code for SDK 3, you will need to:
JsonDocument
class.Collection
instead of a Bucket
.JsonDocument
classSDK 3 no longer requires or provides document classes like SDK 2's JsonDocument
. You are free to model your document ID and content however you like. Here's a class you can use to aid your transition to SDK 3:
public class JsonDocument {
private final String id;
private final JsonObject content;
public JsonDocument(String id, JsonDocument content) {
this.id = id;
this.content = content;
}
public String getId() { return id;}
public JsonObject getContent() { return content;}
@Override
public String toString() {
return "JsonDocument{id='" + id + "', content=" + content + "}";
}
}
Collection
instead of a Bucket
In SDK 3, each bucket has one or more "scopes". Each scope contains one or more "collections". Scopes help with multitenancy; each customer or deployment could be assigned its own scope. Collections help you organize your documents; you could put widgets
and invoices
in separate collections, for example.
Each bucket has a default scope, and that scope has a default collection.
Support for scopes and collections is coming in Couchbase Server 7, but SDK 3 asks you to think about them now. All of the get/insert/remove/etc. methods that used to be part of the Bucket
class have moved to the Collection
class.
Fortunately there's a convenience method for accessing a bucket's default collection.
What does this mean for your code? In SDK 2 you had:
AsyncBucket catalog = couchbaseConfig.catalogBucket().async();
In SDK 3 you would write:
ReactiveCollection catalog = couchbaseConfig.catalogBucket()
.defaultCollection()
.reactive();
As you're probably aware, the Couchbase Java SDK 2.x used RxJava for its reactive model. SDK 3 uses Reactor instead. The concepts are mostly the same, but the names of the reactive primitives are different:
Obseravble<T>
-> Flux<T>
Single<T>
-> Mono<T>
Completable
-> Mono<Void>
The Reactor reference documentation is your friend here.
Assuming documents
is a list of the JsonDocument
class mentioned above, here's what your code might look like in SDK 3:
ReactiveCollection destCollection = couchbaseConfig.catalogBucket()
.defaultCollection()
.reactive();
Flux.fromIterable(documents)
.flatMap(docToInsert ->
destCollection.insert(docToInsert.getId(), docToInsert.getContent())
.doOnError(throwable -> log.error(
"Exception {} occurred while inserting document {} to cb",
throwable.getMessage(), docToInsert)))
.blockLast();
An issue with this code (as well as the SDK 2 version) is that if any insertion fails, the remaining documents will not be inserted. If you want to continue inserting the other documents, you can use onErrorResume()
instead of doOnError()
:
Flux.fromIterable(documents)
.flatMap(docToInsert ->
destCollection.insert(docToInsert.getId(), docToInsert.getContent())
.onErrorResume(throwable -> {
log.error("Exception {} occurred while inserting document {} to cb",
throwable.getMessage(), docToInsert);
return Mono.empty();
}))
.blockLast();
Upvotes: 4