Aviad
Aviad

Reputation: 1549

Reactive stream backpressure with spring reactor project

I have research and read documents by they are not very understandable. What i am trying to achieve is the following functionality:

I am using Spring Reactor project and using the eventBus. My event bus is throwing event to module A.

Module A should receive the event and insert into Hot Stream that will hold unique values. Every 250 Milisecons the stream should pull all value and make calulcation on them.. and so on.

For example: The eventBus is throwing event with number: 1,2,3,2,3,2

The Stream should get and hold unique values -> 1,2,3 After 250 miliseconds the stream should print the number and empty values

Anyone has an idea how to start? I tried the examples but nothing really works and i guess i don't understand something. Anyone has an example?

Tnx

EDIT:

When trying to do the next i always get exception:

        Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);

        s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

        for (int i = 0; i < 10000; i++) {
            p.onNext(i);
        }

The exception:

java.lang.IllegalStateException: The environment has not been initialized yet
    at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
    at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
    at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
    at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
    at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]

As you can see i cannot proceed trying without passing this issue.

BY THE WAY: This is happeing only when i use buffer(1, TimeUnit.SECONDS) If i use buffer(50) for example it works.. Although this is not the final solution its a start.

Upvotes: 1

Views: 801

Answers (1)

Aviad
Aviad

Reputation: 1549

Well after reading doc again i missed this:

static {
        Environment.initialize();
    }

This solved the problem. Tnx

Upvotes: 0

Related Questions