Japheth Odonya
Japheth Odonya

Reputation: 333

How do I make View's asList() sortable in Google Dataflow SDK?

We have a problem making asList() method sortable.

We thought we could do this by just extending the View class and override the asList method but realized that View class has a private constructor so we could not do this.

Our other attempt was to fork the Google Dataflow code on github and modify the PCollectionViews class to return a sorted list be using the Collections.sort method as shown in the code snippet below

@Override
protected List<T> fromElements(Iterable<WindowedValue<T>> contents) {
    Iterable<T> itr = Iterables.transform(
        contents,
        new Function<WindowedValue<T>, T>() {
          @SuppressWarnings("unchecked")
          @Override
          public T apply(WindowedValue<T> input){
            return input.getValue();
          }
        });

    LOG.info("#### About to start sorting the list !");
    List<T> tempList = new ArrayList<T>();
    for (T element : itr) {
      tempList.add(element);
    };
    Collections.sort((List<? extends Comparable>) tempList);
    LOG.info("##### List should now be sorted !");
    return ImmutableList.copyOf(tempList);
}

Note that we are now sorting the list.

This seemed to work, when run with the DirectPipelineRunner but when we tried the BlockingDataflowPipelineRunner, it didn't seem like the code change was being executed.

Note: We actually recompiled the dataflow used it in our project but this did not work.

How can we be able to achieve this (as sorted list from the asList method call)?

Upvotes: 0

Views: 415

Answers (2)

Kenn Knowles
Kenn Knowles

Reputation: 6023

The classes in PCollectionViews are not intended for extension. Only the primitive view types provided by View.asSingleton, View.asSingleton View.asIterable, View.asMap, and View.asMultimap are supported.

To obtain a sorted list from a PCollectionView, you'll need to sort it after you have read it. The following code demonstrates the pattern.

// Assume you have some PCollection
PCollection<MyComparable> myPC = ...;

// Prepare it for side input as a list
final PCollectionView<List<MyComparable> myView = myPC.apply(View.asList());

// Side input the list and sort it
someOtherValue.apply(
    ParDo.withSideInputs(myView).of(
        new DoFn<A, B>() {
          @Override
          public void processElement(ProcessContext ctx) {
            List<MyComparable> tempList =
                Lists.newArrayList(ctx.sideInput(myView));
            Collections.sort(tempList);
            // do whatever you want with sorted list 
          }
        }));

Of course, you may not want to sort it repeatedly, depending on the cost of sorting vs the cost of materializing it as a new PCollection, so you can output this value and read it as a new side input without difficulty:

// Side input the list, sort it, and put it in a PCollection
PCollection<List<MyComparable>> sortedSingleton = Create.<Void>of(null).apply(
    ParDo.withSideInputs(myView).of(
        new DoFn<Void, B>() {
          @Override
          public void processElement(ProcessContext ctx) {
            List<MyComparable> tempList =
                Lists.newArrayList(ctx.sideInput(myView));
            Collections.sort(tempList);
            ctx.output(tempList);
          }
        }));

// Prepare it for side input as a list
final PCollectionView<List<MyComparable>> sortedView =
    sortedSingleton.apply(View.asSingleton());

someOtherValue.apply(
    ParDo.withSideInputs(sortedView).of(
        new DoFn<A, B>() {
          @Override
          public void processElement(ProcessContext ctx) {
            ... ctx.sideInput(sortedView) ...
            // do whatever you want with sorted list 
          }
        }));

You may also be interested in the unsupported sorter contrib module for doing larger sorts using both memory and local disk.

Upvotes: 2

Jonathan Sylvester
Jonathan Sylvester

Reputation: 1329

We tried to do it the way Ken Knowles suggested. There's a problem for large datasets. If the tempList is large (so sort takes some measurable time as it's proportion to O(n * log n)) and if there are millions of elements in the "someOtherValue" PCollection, then we are unecessarily re-sorting the same list millions of times. We should be able to sort ONCE and FIRST, before passing the list to the someOtherValue.apply's DoFn.

Upvotes: 0

Related Questions