Mulgard
Mulgard

Reputation: 10609

Subscribers onnext does not contain complete item

We are working with project reactor and having a huge problem right now. This is how we produce (publish our data):

public Flux<String> getAllFlux() {
    return Flux.<String>create(sink -> {
        new Thread(){
            public void run(){
                Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator();
                ObjectMapper mapper = new ObjectMapper();

                while(iterator.hasNext()) {
                    try {
                        sink.next(mapper.writeValueAsString(iterator.next().getValue()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

                sink.complete();
            }
        } .start();
    });
}

As you can see we are taking data from an iterator and are publishing each item in that iterator as a json string. Our subscriber does the following:

flux.subscribe(new Subscriber<String>() {
    private Subscription s;

    int amount = 1; // the amount of received flux payload at a time
    int onNextAmount;

    String completeItem="";

    ObjectMapper mapper = new ObjectMapper();

    @Override
    public void onSubscribe(Subscription s) {
        System.out.println("subscribe");

        this.s = s;
        this.s.request(amount);
    }

    @Override
    public void onNext(String item) {
        MyObject myObject = null;

        try {
            System.out.println(item);

            myObject = mapper.readValue(completeItem, MyObject.class);

            System.out.println(myObject.toString());
        } catch (IOException e) {
            System.out.println(item);
            System.out.println("failed: " + e.getLocalizedMessage());
        }

        onNextAmount++;

        if (onNextAmount % amount == 0) {
            this.s.request(amount);
        }
    }

    @Override
    public void onError(Throwable t) {
        System.out.println(t.getLocalizedMessage())
    }

    @Override
    public void onComplete() {
        System.out.println("completed");
    });
}

As you can see we are simply printing the String item which we receive and parsing it into an object using jackson wrapper. The problem we got now is that for most of our items everything works fine:

{"itemId": "someId", "itemDesc", "some description"}

But for some items the String is cut off like this for example:

{"itemId": "some"

And the next item after that would be

"Id", "itemDesc", "some description"}

There is no pattern for those cuts. It is completely random and it is different everytime we run that code. Ofcourse our jackson is gettin an error Unexpected end of Input with that behaviour.

So what is causing such a behaviour and how can we solve it?

Upvotes: 0

Views: 64

Answers (1)

Mulgard
Mulgard

Reputation: 10609

Solution:

Send the Object inside the flux instead of the String:

public Flux<ItemIgnite> getAllFlux() {
   return Flux.create(sink -> {
       new Thread(){
           public void run(){
               Iterator<Cache.Entry<String, ItemIgnite>> iterator = getAllIterator();

               while(iterator.hasNext()) {
                   sink.next(iterator.next().getValue());
               }
           }
       } .start();
   });
} 

and use the following produces type:

@RequestMapping(value="/allFlux", method=RequestMethod.GET, produces="application/stream+json")

The key here is to use stream+json and not only json.

Upvotes: 1

Related Questions