Reputation: 3
I'm trying to understand reactive style. But stuck on this example.
public class ScriptServiceImpl implements ScriptService{
private static Logger log = LoggerFactory.getLogger(ScriptServiceImpl.class);
private final ScriptEngineManager manager = new ScriptEngineManager();
private final ScriptEngine engine = manager.getEngineByName("JavaScript");
@Override
public Flux<MyFunctionResult> evaluate(MyFunction myFunction, Integer iterations){
Flux<MyFunctionResult> flux = Flux.empty();
flux.mergeWith(
Flux.range(1,iterations)
.map(counter -> {
engine.put("parametr", counter);
try {
long start = System.currentTimeMillis();
String functionResult = engine.eval(myFunction.getSource()).toString();
long timer = System.currentTimeMillis() - start;
return Mono.just(new MyFunctionResult(timer, functionResult, myFunction.getNumber(), counter));
} catch (ScriptException ex) {
return Mono.error(ex);
}
})
);
return flux;
}
}
I want to return Flux
of MyFunctionResult
but get Flux
of Object
in Flux.mergeWith
section. What am i doing wrong?
Upvotes: 0
Views: 69
Reputation: 5924
There are multiple issues here
MyFunctionResult
into Mono
. map
expects none-reactive return type. As result, instead of Mono.error
you should just wrap checked exception into unchecked RuntimeException
.flux.mergeWith
and not flux
. But in general for this example you don't need mergeWith
Your code could be converted into
return Flux.range(1,iterations)
.map(counter -> {
engine.put("parametr", counter);
try {
long start = System.currentTimeMillis();
String functionResult = engine.eval(myFunction.getSource()).toString();
long timer = System.currentTimeMillis() - start;
return new MyFunctionResult(timer, functionResult, myFunction.getNumber(), counter);
} catch (ScriptException ex) {
throw Exceptions.propagate(ex);
}
});
In addition, not sure about engine.eval
but in case this is blocking code consider wrapping it and run on a separate scheduler How Do I Wrap a Synchronous, Blocking Call?
Upvotes: 1