Armin
Armin

Reputation: 471

RxJava group sorted data

I have a very large collection of data and the raw data is already sorted by what is going to be the key. For example, I have a CSV file and the first column is going to be the key for the grouping.

A,x,x,x
A,x,y,x
A,z,y,y
C,x,s,d
C,t,d,s
B,a,s,a
E,x,x,x
E,t,r,y

These lines are converted into objects and put into a list and streamed using RxJava Flowable. Because this CSV is going to be huge (so huge that it'll crash the application), is there a good way to convert these objects into a map entry, which would look like this:

{ 'A': [[x,x,x],[x,y,x],[z,y,y]] }

Upvotes: 4

Views: 217

Answers (2)

Armin
Armin

Reputation: 471

I'm using FlowableTransformers.partialCollect now. An example would be

Flowable.fromPublisher(FlowableTransformers.partialCollect(
        (Consumer<PartialCollectEmitter<LineData, Integer, 
        ListBuilder, ListDataModel>>) emitter -> {
            // Get or initialize collecting object
            ListBuilder lb = emitter.getAccumulator();
            if (lb == null) {
                lb = new ListBuilder();
                emitter.setAccumulator(lb);
            }

            if (emitter.demand() != 0) {
                boolean d = emitter.isComplete();
                if (emitter.size() != 0 && !d) {
                    LineData data = emitter.getItem(0);
                    emitter.dropItems(1);

                    // add returns the finished model if the prefix changes
                    ListDataModel model = lb.add(data);

                    if (model != null) {
                        emitter.next(model);
                    }
                } else if (d) {
                    if (!lb.isEmpty()) {
                        // clear returns the last model
                        emitter.next(lb.clear());
                    }
                    emitter.complete();
                    return;
                }
            }
            emitter.setIndex(0);
        }, Functions.emptyConsumer(), settings.getReadBufferSize() + 1).apply(
                Flowable.fromIterable(file.getFileNameList())
                        .concatMap(
                                fileName -> reader
                                        .getLineData(fileName)
                                        .buffer(settings.getReadBufferSize()))
                        .flatMap(Flowable::fromIterable)))

Upvotes: 1

Dave Moten
Dave Moten

Reputation: 12097

Use collectWhile from rxjava2-extras and specify the collection factory to produce a special keyed object:


class Keyed {
    final K key;
    final List<Value> list;
    ...
}

K key(Value value) {
 ...
}

source.compose(
  Transformers.
    collectWhile(
      // factory
      () -> new Keyed(),
      // add
      (keyed, x) -> { 
          keyed.list.add(x);
          return keyed; },
      // condition
      (keyed, x) -> 
         keyed.list.isEmpty() ||
         key(x).equals(keyed.key)));

Upvotes: 0

Related Questions