Reputation: 740
I'm using the MongoDB Reactive Streams Java API which I implemented following this example, but I'm encountering a serious problem: sometimes, when I try to query a collection, the await
methods doesn't work, and it hangs until the timeout is reached.
The onSubscribe
methods gets called correctly, but then neither onNext
, nor onError
nor onComplete
get called.
There doesn't seem to be a specific circumstance causing this issue.
This is my code
MongoDatabase database = MongoDBConnector.getClient().getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));
SettingSubscriber tagSub = new SettingSubscriber(finder);
//SettingsSubscriber is a subclass of ObservableSubscriber which calls publisher.subscribe(this)
tagSub.await(); //this is where it hangs
return tagSub.getWrappedData();
Upvotes: 3
Views: 1033
Reputation: 4971
I wrote a simple implementation of what I assumed the SettingSubscriber
looked like and tried to recreate the problem using a groovy script. I couldn't - my code runs without hanging, prints each output record and exits. Code for reference below:
@Grab(group = 'org.mongodb', module = 'mongodb-driver-reactivestreams', version = '4.3.3')
@Grab(group = 'org.slf4j', module = 'slf4j-api', version = '1.7.32')
@Grab(group = 'ch.qos.logback', module = 'logback-classic', version = '1.2.6')
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
MongoClientSettings.Builder clientSettingsBuilder = MongoClientSettings.builder()
.applyToClusterSettings { clusterSettingsBuilder ->
clusterSettingsBuilder.hosts( Arrays.asList(new ServerAddress("localhost", 27017)))
};
MongoClient mongoClient = MongoClients.create(clientSettingsBuilder.build());
MongoDatabase database = mongoClient.getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));
SettingSubscriber tagSub = new SettingSubscriber(finder);
tagSub.await();
class SettingSubscriber implements Subscriber<Document> {
private final CountDownLatch latch = new CountDownLatch(1);
private Subscription subscription;
private List<Document> data = new ArrayList<>();
public SettingSubscriber(FindPublisher<Document> finder) {
finder.subscribe(this);
}
@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(final Document document) {
System.out.println("Received: " + document);
data.add(document);
subscription.request(1);
}
@Override
public void onError(final Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
public void onComplete() {
System.out.println("Completed");
latch.countDown();
}
public List<Document> getWrappedData() {
return data;
}
public void await() throws Throwable {
await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public void await(final long timeout, final TimeUnit unit) throws Throwable {
if (!latch.await(timeout, unit)) {
System.out.println("Publish timed out");
}
}
}
Can you compare this implementation of the SettingSubscriber
with yours to see if something is missed?
Upvotes: 1