Johan
Johan

Reputation: 76724

How to perform a sort + indexing whilst removing duplicates at the same time?

I use the following code to sort both items and an index: There are about 256*1024*1024 entries in the values and index arrays.
The uncompressed values array occupies about 10GB, which is why I want to compress it by grouping together duplicate values of which there are many.

I've come up with the following proof of concept code, but there is a problem in the Compress method. Every time it finds a duplicate it performs a search in the index which costs O(N2) time.

I need to keep an index, because I need to be able to access elements in the array as if no de-duplication took place.

This is done, using a simple default property array property, thus mimicking the original array:

function TLookupTable.GetItems(Index: integer): TSlice;
begin   
  Result:= FData[FIndex[Index]];
end;

The proof of concept code (which is dog slow) is as follows.

TMyArray = class
  class procedure QuickSort<T,Idx>(var Values: array of T; var Index: array of Idx; const Comparer: IComparer<T>;
  L, R: Integer);
  class procedure Compress<T>(const Values: array of T; var Index: array of Integer; out CompressedValues: TArray<T>; const Comparer: IComparer<T>);
end;

class procedure TMyArray.QuickSort<T,Idx>(var Values: array of T; var Index: array of Idx; const Comparer: IComparer<T>;
  L, R: Integer);
var
  I, J: Integer;
  pivot, temp: T;
  TempIdx: Idx;
begin
  if (Length(Values) = 0) or ((R - L) <= 0) then
    Exit;
  repeat
    I := L;
    J := R;
    pivot := Values[L + (R - L) shr 1];
    repeat
      while Comparer.Compare(Values[I], pivot) < 0 do Inc(I);
      while Comparer.Compare(Values[J], pivot) > 0 do Dec(J);
      if I <= J then begin
        if I <> J then begin
          temp := Values[I];
          Values[I] := Values[J];
          Values[J] := temp;

          //Keep the index in sync
          tempIdx := Index[I];
          Index[I] := Index[J];
          Index[J] := tempIdx;
        end;
        Inc(I);
        Dec(J);
      end;
    until I > J;
    if L < J then QuickSort<T,Idx>(Values, Index, Comparer, L, J);
    L := I;
  until I >= R;
end;

class procedure TMyArray.Compress<T>(const Values: array of T; var Index: array of integer; out CompressedValues: TArray<T>; const Comparer: IComparer<T>);
var
  i,j: integer;
  Count: integer;
  Duplicate: integer;
begin
  Count:= 256*1024*1024;
  SetLength(CompressedValues, Count);
  CompressedValues[0]:= Values[0];
  Duplicate:= 0;
  for i := 1 to High(Values) do begin
    //Compress duplicate values
    if Comparer.Compare(Values[i], CompressedValues[Duplicate]) = 0 then begin
      //Search for the indexed item
      //Very time consuming: O(N*N)
      for j:= i to High(Index) do if Index[j] = i then begin
        Index[j]:= Duplicate;  //Fix up the index
        Break;
      end; {for j}
    end else begin
      Inc(Duplicate);
      if Duplicate >= Count  then begin
        Inc(Count, 1024*1024);
        SetLength(CompressedValues, Count);
      end;
      CompressedValues[Duplicate]:= Values[i];
    end; 
  end; {for i}
  SetLength(CompressedValues, Duplicate+1)
end;

How can I speed up the compress step so that it takes O(N) time?

If there is a way to both keep the index and remove duplicates and sort, all at the same time, that would be great. My answer below splits the sort and the dedup into two separate stages.

Upvotes: 2

Views: 223

Answers (1)

Johan
Johan

Reputation: 76724

The trick is to leave the original data array alone and just sort the index. We can then exploit the fact that the original data is in the correct order and use that to build a new index.

In addition that means that we no longer need a custom Sort function; it also moves a lot less data around.

Create the index like so:

  FIndex: TArray<integer>;
  ....
  SetLength(FIndex, Length(FAllData));
  for i:= 0 to count-1 do begin
    FIndex[i]:= i;
  end;
  TArray.Sort<Integer>(FIndex, TDelegatedComparer<integer>.Construct(
  function(const Left, Right: Integer): Integer
  begin
    if FAllData[Left] > FAllData[Right] then Exit(1);
    if FAllData[Left] < FAllData[Right] then Exit(-1);
    Result:= 0;
  end));

Change the TMyArray class like so:

TMyArray = class
  class procedure Compress<T>(const Values: array of T; var Index: TArray<integer>; out CompressedValues: TArray<T>; const Comparer: IComparer<T>);
end;

class procedure TMyArray.Compress<T>(const Values: array of T; var Index: TArray<integer>; out CompressedValues: TArray<T>; const Comparer: IComparer<T>);
const
  Equal = 0;
var
  i,j: integer;
  Count: integer;
  Duplicate: integer;
  IndexEntry: integer;
  OutIndex: TArray<integer>;
begin

  Count:= 16*1024*1024;  //Start with something reasonable
  SetLength(CompressedValues, Count);
  SetLength(OutIndex, Length(Index));
  Duplicate:= 0;
  CompressedValues[0]:= Values[Index[0]];
  OutIndex[Index[0]]:= 0;
  for i:= 1 to High(Index) do begin
    if Comparer.Compare(Values[Index[i]], CompressedValues[Duplicate]) = Equal then begin
      OutIndex[Index[i]]:= Duplicate;
    end else begin
      Inc(Duplicate);
      //Grow as needed
      if (Duplicate >= Length(CompressedValues)) then SetLength(CompressedValues, Length(CompressedValues) + 1024*1024);
      CompressedValues[Duplicate]:= Values[Index[i]];
      OutIndex[Index[i]]:= Duplicate;
    end;
  end;
  Index:= OutIndex;
  SetLength(CompressedValues, Duplicate+1);
end;

Upvotes: 3

Related Questions