user1052610
user1052610

Reputation: 4719

Returning value from asynchronous handler to containing method

Given the following two classes:

public class Test {

    public void call() {
        MyClass myClass = new MyClass();

        myClass.methodOne().subscribe(v -> {...});
    }
}

public class MyClass {

    public Observable<String> methodOne() {
        Observable<String> response =  Observable.fromFuture(this.methodTwo());
        return response;
    }

    public CompletableFuture<String> methodTwo() {
        CompletableFuture<String> response = new CompletableFuture<>();
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);

        response.complete("initial value");

        kafkaProducer.write(record, done -> {
            if(done.succeeded()) {
                response.complete("done");
            }
            else {
                response.complete("not done");
            };
        );

        return response;
    }
}

Where kafkaProducer is an instance of io.vertx.kafka.client.producer.impl.KafkaProducerImpl.

The expected behavior is that when response.complete() has been called in MyClass.methodTwo(), the response value will be returned from methodTwo() to methodOne(). That value will then get wrapped in a future and will be available within the handler of subscribe in Test.call().

However, because of the asynchronous processing methodTwo() will always return "initial value" which was set before the write method of the vertx. kafkaProducer.

Even though at some later point response will be set within the handler to either "done" or "not done" that value is never returned.

I have tried changing the code in methodTwo to the following:

AsyncResult<RecordMetadata> res = 
Single.create((SingleEmitter<AsyncResult<RecordMetadata>> emitter) ->
   producer.write(record,   result -> emitter.onSuccess(result)))
   .blockingGet();

with the idea of then returning the value in AsyncResult, but this blocks indefinitely.

What is the correct way to approach this?

Thanks

Upvotes: 1

Views: 2073

Answers (1)

Will_Panda
Will_Panda

Reputation: 534

You can use Vert.x Handler to handle the result of your async call. Once the async call gets completed you can call the handler which got passed with the result.

A small snippet relating to address your problem is below.

public class Test {

    public void call() {
        MyClass myClass = new MyClass();

        myClass.methodTwo(f-> {
            if (f.succeeded()) {
                //do something with f.result()
            }
            else {
                //handle;
            }
        });
    }
}

public class MyClass {

    public void methodTwo(Handler<AsyncResult<String>> handler) {
        KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);

        //response.complete("initial value");

        kafkaProducer.write(record, done -> {
            if(done.succeeded()) {
                handler.handle("done");
            }
            else {
                handler.handle("not done");
            };
        );

        //return response;
    }
}

Upvotes: 4

Related Questions