Pavel Mihnenkov
Pavel Mihnenkov

Reputation: 3

Stream returns wrong type

I'm trying to understand reactive style. But stuck on this example.

public class ScriptServiceImpl implements ScriptService{

    private static Logger log = LoggerFactory.getLogger(ScriptServiceImpl.class);
    private final ScriptEngineManager manager = new ScriptEngineManager();
    private final ScriptEngine engine = manager.getEngineByName("JavaScript");

    @Override
    public Flux<MyFunctionResult> evaluate(MyFunction myFunction, Integer iterations){
        Flux<MyFunctionResult> flux = Flux.empty();
        flux.mergeWith(
                Flux.range(1,iterations)
                .map(counter -> {
                    engine.put("parametr", counter);
                    try {
                        long start = System.currentTimeMillis();
                        String functionResult = engine.eval(myFunction.getSource()).toString();
                        long timer = System.currentTimeMillis() - start;

                        return Mono.just(new MyFunctionResult(timer, functionResult, myFunction.getNumber(), counter));
                    } catch (ScriptException ex) {
                        return Mono.error(ex);
                    }
                })
        );
        return flux;
    }
}

I want to return Flux of MyFunctionResult but get Flux of Object in Flux.mergeWith section. What am i doing wrong?

Upvotes: 0

Views: 69

Answers (1)

Alex
Alex

Reputation: 5924

There are multiple issues here

  • you don't need to wrap MyFunctionResult into Mono. map expects none-reactive return type. As result, instead of Mono.error you should just wrap checked exception into unchecked RuntimeException.
  • you need to return result of the flux.mergeWith and not flux. But in general for this example you don't need mergeWith

Your code could be converted into

return Flux.range(1,iterations)
        .map(counter -> {
            engine.put("parametr", counter);
            try {
                long start = System.currentTimeMillis();
                String functionResult = engine.eval(myFunction.getSource()).toString();
                long timer = System.currentTimeMillis() - start;

                return new MyFunctionResult(timer, functionResult, myFunction.getNumber(), counter);
            } catch (ScriptException ex) {
                throw Exceptions.propagate(ex);
            }
        });

In addition, not sure about engine.eval but in case this is blocking code consider wrapping it and run on a separate scheduler How Do I Wrap a Synchronous, Blocking Call?

Upvotes: 1

Related Questions