gscaparrotti
gscaparrotti

Reputation: 740

MongoDB Reactive Streams hangs when performing a query

I'm using the MongoDB Reactive Streams Java API which I implemented following this example, but I'm encountering a serious problem: sometimes, when I try to query a collection, the await methods doesn't work, and it hangs until the timeout is reached.

The onSubscribe methods gets called correctly, but then neither onNext, nor onError nor onComplete get called.

There doesn't seem to be a specific circumstance causing this issue.

This is my code

MongoDatabase database = MongoDBConnector.getClient().getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));    
SettingSubscriber tagSub = new SettingSubscriber(finder); 
//SettingsSubscriber is a subclass of ObservableSubscriber which calls publisher.subscribe(this)
tagSub.await(); //this is where it hangs
return tagSub.getWrappedData();

Upvotes: 3

Views: 1033

Answers (1)

devatherock
devatherock

Reputation: 4971

I wrote a simple implementation of what I assumed the SettingSubscriber looked like and tried to recreate the problem using a groovy script. I couldn't - my code runs without hanging, prints each output record and exits. Code for reference below:

@Grab(group = 'org.mongodb', module = 'mongodb-driver-reactivestreams', version = '4.3.3')
@Grab(group = 'org.slf4j', module = 'slf4j-api', version = '1.7.32')
@Grab(group = 'ch.qos.logback', module = 'logback-classic', version = '1.2.6')

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;

MongoClientSettings.Builder clientSettingsBuilder = MongoClientSettings.builder()
        .applyToClusterSettings { clusterSettingsBuilder ->
            clusterSettingsBuilder.hosts( Arrays.asList(new ServerAddress("localhost", 27017)))
        };
MongoClient mongoClient = MongoClients.create(clientSettingsBuilder.build());
MongoDatabase database = mongoClient.getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));
SettingSubscriber tagSub = new SettingSubscriber(finder);
tagSub.await();

class SettingSubscriber implements Subscriber<Document> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private Subscription subscription;
    private List<Document> data = new ArrayList<>();

    public SettingSubscriber(FindPublisher<Document> finder) {
        finder.subscribe(this);
    }

    @Override
    public void onSubscribe(final Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(final Document document) {
        System.out.println("Received: " + document);
        data.add(document);
        subscription.request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        throwable.printStackTrace();
        latch.countDown();
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
        latch.countDown();
    }
    
    public List<Document> getWrappedData() {
        return data;
    }

    public void await() throws Throwable {
        await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public void await(final long timeout, final TimeUnit unit) throws Throwable {
        if (!latch.await(timeout, unit)) {
            System.out.println("Publish timed out");
        }
    }
}

Can you compare this implementation of the SettingSubscriber with yours to see if something is missed?

Upvotes: 1

Related Questions