Reputation: 4719
Given the following two classes:
public class Test {
public void call() {
MyClass myClass = new MyClass();
myClass.methodOne().subscribe(v -> {...});
}
}
public class MyClass {
public Observable<String> methodOne() {
Observable<String> response = Observable.fromFuture(this.methodTwo());
return response;
}
public CompletableFuture<String> methodTwo() {
CompletableFuture<String> response = new CompletableFuture<>();
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);
response.complete("initial value");
kafkaProducer.write(record, done -> {
if(done.succeeded()) {
response.complete("done");
}
else {
response.complete("not done");
};
);
return response;
}
}
Where kafkaProducer
is an instance of io.vertx.kafka.client.producer.impl.KafkaProducerImpl
.
The expected behavior is that when response.complete()
has been called in MyClass.methodTwo()
, the response
value will be returned from methodTwo()
to methodOne()
. That value will then get wrapped in a future and will be available within the handler of subscribe
in Test.call()
.
However, because of the asynchronous processing methodTwo()
will always return "initial value" which was set before the write method of the vertx. kafkaProducer
.
Even though at some later point response
will be set within the handler to either "done" or "not done" that value is never returned.
I have tried changing the code in methodTwo
to the following:
AsyncResult<RecordMetadata> res =
Single.create((SingleEmitter<AsyncResult<RecordMetadata>> emitter) ->
producer.write(record, result -> emitter.onSuccess(result)))
.blockingGet();
with the idea of then returning the value in AsyncResult
, but this blocks indefinitely.
What is the correct way to approach this?
Thanks
Upvotes: 1
Views: 2073
Reputation: 534
You can use Vert.x Handler to handle the result of your async call. Once the async call gets completed you can call the handler which got passed with the result.
A small snippet relating to address your problem is below.
public class Test {
public void call() {
MyClass myClass = new MyClass();
myClass.methodTwo(f-> {
if (f.succeeded()) {
//do something with f.result()
}
else {
//handle;
}
});
}
}
public class MyClass {
public void methodTwo(Handler<AsyncResult<String>> handler) {
KafkaProducerRecord<String, String> record = KafkaProducerRecord.create(...);
//response.complete("initial value");
kafkaProducer.write(record, done -> {
if(done.succeeded()) {
handler.handle("done");
}
else {
handler.handle("not done");
};
);
//return response;
}
}
Upvotes: 4