hmghaly
hmghaly

Reputation: 1502

How to estimate percentiles on streaming data. (Identifying equally sized bins of numbers in a stream of data in python)

Peer summary: HMGHaly wants to find the locations of equally spaced percentiles on a data stream. The bins HMGHaly is after should therefore contain roughly the same number of data points, and are therefore not expected to have the same distance between the bin boundaries. The size as HMGHaly uses it refers to the number of data points in the bin not the width of the bin.

I have an iterable of numbers which I cannot fully load in memory, and I want to split these numbers into bins of equal size, meaning that if I want to sort all these numbers and split them into for example 10 groups/bins, what is the lowest value and highest value of each bin.

It is quite easy to identify the mean by counting and adding the numbers so far. It is also quite easy to get the minimum and maximum value so far, but this kind of splitting seems challenging.

I have a few ideas:

So, the question is here as follows:

Edit The bin splitting process is as follows, for simple in-memory list sorting/splitting/binning:

import random
list1=[random.randint(0,20) for i in range(100)]
list1.sort()
print("full list:",list1)
n_intervals=10
interval_size=int(len(list1)/n_intervals)
for i0 in range(n_intervals):
  small_list1=list1[interval_size*i0:interval_size*(i0+1)]
  bounds=(small_list1[0],small_list1[-1])
  print("small_list # %s"%i0,  small_list1,"size:",len(small_list1), "bounds:", bounds)

Output

full list: [0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 10, 10, 10, 10, 11, 11, 11, 11, 12, 12, 12, 12, 12, 12, 13, 13, 14, 14, 14, 14, 14, 14, 15, 15, 16, 16, 16, 16, 16, 16, 16, 17, 17, 17, 18, 18, 18, 19, 19, 19, 19, 19, 19, 19, 20, 20, 20]
small_list # 0 [0, 0, 0, 1, 1, 1, 1, 2, 2, 2] size: 10 - bounds: (0, 2)
small_list # 1 [2, 2, 2, 2, 3, 3, 3, 3, 4, 4] size: 10 - bounds: (2, 4)
small_list # 2 [4, 5, 5, 5, 5, 5, 5, 5, 5, 6] size: 10 - bounds: (4, 6)
small_list # 3 [6, 6, 6, 6, 7, 7, 7, 7, 7, 7] size: 10 - bounds: (6, 7)
small_list # 4 [7, 8, 8, 8, 8, 8, 8, 8, 8, 9] size: 10 - bounds: (7, 9)
small_list # 5 [9, 9, 9, 10, 10, 10, 10, 11, 11, 11] size: 10 - bounds: (9, 11)
small_list # 6 [11, 12, 12, 12, 12, 12, 12, 13, 13, 14] size: 10 - bounds: (11, 14)
small_list # 7 [14, 14, 14, 14, 14, 15, 15, 16, 16, 16] size: 10 - bounds: (14, 16)
small_list # 8 [16, 16, 16, 16, 17, 17, 17, 18, 18, 18] size: 10 - bounds: (16, 18)
small_list # 9 [19, 19, 19, 19, 19, 19, 19, 20, 20, 20] size: 10 - bounds: (19, 20)

Further edit: To be fully clear, I need something like the following. It is very easy to get the mean, min and max, but the question now is how to define the boundary values that can split all the values into bins of equal size, while calculating them as a stream of running values, without having to store the running values in memory.

import random
random.seed(0)
count0=0
sum0=0
running_min0=None
running_max0=None

def get_bin_boundaries(n_bins=5): #The function I need, it can take any arguments
  return #and return a list of boundary values corresponding to n_bins+1 e.g. [0,3,7,9,11,15]

for i in range(100000000):
  cur_number=random.randint(0,20)
  count0+=1
  sum0+=cur_number
  running_mean0=sum0/count0
  if running_min0==None or running_min0>cur_number:running_min0=cur_number
  if running_max0==None or running_max0<cur_number:running_max0=cur_number
  running_bin_boundaries=get_bin_boundaries() #This is what I need
  #print("cur_number",cur_number,"running_mean0",running_mean0,"running_min0",running_min0,"running_max0",running_max0)



Upvotes: 0

Views: 1301

Answers (7)

hmghaly
hmghaly

Reputation: 1502

I have received a considerable number of answers here and elsewhere with many great suggestions. I eventually figured out a solution that is informed by many of these suggestions, so I'm posting it here. The idea is as follows:

  • Iterate over the numbers, and the first number that is non-zero is the reference value
  • For each subsequent number, identify the relative difference between it and the reference value (number - ref) / ref
  • Get the relative difference as a percentage by multiplying with 100
  • Normalize the percentage value around a certain step (e.g. 5% - 10%), so that for example the percentage differences of 3,4,5,6,7 correspond to 5%, while 8,9,10,11,12 would correspond to 10%
  • Now count the number of occurrences of numbers that correspond to each normalized percentage value
  • Based on the original reference value, and the normalized percentage, and the counts of numbers corresponding to the normalized percentage, we can identify raw bins (high and low values with the corresponding count of occurrences)
  • Then we can sort these numbers and combine the bins as needed to make them comparable-size bins

Here is the code I used:


import random
random.seed(0)
count0=0
sum0=0
running_min0=None
running_max0=None

ref_value=None
size0=5 #to normalize the percentage values
diff_counter={}
for i in range(100000):
  cur_number1=random.randint(0,20)
  cur_number2=random.random()
  cur_number=cur_number1*cur_number2
  #cur_number=cur_number2+2
  if cur_number!=0 and ref_value==None: ref_value=cur_number #we identify the first non-zero value as our reference value
  rel_diff=(cur_number-ref_value)/ref_value #we get the relative difference between the current number and reference value
  diff_percent=round(rel_diff*100) #we turn it into a percentage
  normalized_diff_percent=round(diff_percent/size0)*size0 #we normalize the percentage as needed, so that e.g. 5,6,7,8,9,10,11,12,13,14 would correspond to 10
  diff_counter[normalized_diff_percent]=diff_counter.get(normalized_diff_percent,0)+1 #we increment the count of items with this normalized percentage from the reference value

  count0+=1
  sum0+=cur_number
  running_mean0=sum0/count0 #we get the running mean value
  if running_min0==None or running_min0>cur_number:running_min0=cur_number #and the running minimum value
  if running_max0==None or running_max0<cur_number:running_max0=cur_number #and the running maximum value

counted_percent_items=list(diff_counter.items()) #now we list the normalized percentage values and their corresponding item count
counted_percent_items.sort()

for a,b in counted_percent_items:
  #cur_ratio=0.01*a
  cur_low_percent0,cur_high_percent0=round(a-0.5*size0),round(a+0.5*size0)
  cur_low_val=(0.01*cur_low_percent0*ref_value)+ref_value
  cur_high_val=(0.01*cur_high_percent0*ref_value)+ref_value
  print("Bin around %s percent - count: %s - boundaries: %s <= X < %s"%(a,b,round(cur_low_val,4),round(cur_high_val,4)))
  #print(a,b,"boundary0,boundary1",cur_low_percent0,cur_high_percent0,"cur_low_val",round(cur_low_val,4),"cur_high_val",round(cur_high_val,4))
print("ref_value",ref_value)
print("running_mean0",round(running_mean0,4),"running_min0",round( running_min0,4),"running_max0",round(running_max0,4))

and here is the output:

Bin around -100 percent - count: 8671 - boundaries: -0.1819 <= X < 0.1819
Bin around -95 percent - count: 7682 - boundaries: 0.1819 <= X < 0.7276
Bin around -90 percent - count: 7251 - boundaries: 0.7276 <= X < 1.0915
Bin around -85 percent - count: 5634 - boundaries: 1.0915 <= X < 1.6372
Bin around -80 percent - count: 5485 - boundaries: 1.6372 <= X < 2.001
Bin around -75 percent - count: 4599 - boundaries: 2.001 <= X < 2.5467
Bin around -70 percent - count: 4560 - boundaries: 2.5467 <= X < 2.9105
Bin around -65 percent - count: 3913 - boundaries: 2.9105 <= X < 3.4563
Bin around -60 percent - count: 3913 - boundaries: 3.4563 <= X < 3.8201
Bin around -55 percent - count: 3527 - boundaries: 3.8201 <= X < 4.3658
Bin around -50 percent - count: 3224 - boundaries: 4.3658 <= X < 4.7296
Bin around -45 percent - count: 3013 - boundaries: 4.7296 <= X < 5.2754
Bin around -40 percent - count: 2798 - boundaries: 5.2754 <= X < 5.6392
Bin around -35 percent - count: 2797 - boundaries: 5.6392 <= X < 6.1849
Bin around -30 percent - count: 2491 - boundaries: 6.1849 <= X < 6.5487
Bin around -25 percent - count: 2530 - boundaries: 6.5487 <= X < 7.0945
Bin around -20 percent - count: 2072 - boundaries: 7.0945 <= X < 7.4583
Bin around -15 percent - count: 2192 - boundaries: 7.4583 <= X < 8.004
Bin around -10 percent - count: 1898 - boundaries: 8.004 <= X < 8.3678
Bin around -5 percent - count: 1905 - boundaries: 8.3678 <= X < 8.9135
Bin around 0 percent - count: 1755 - boundaries: 8.9135 <= X < 9.2774
Bin around 5 percent - count: 1643 - boundaries: 9.2774 <= X < 9.8231
Bin around 10 percent - count: 1491 - boundaries: 9.8231 <= X < 10.1869
Bin around 15 percent - count: 1514 - boundaries: 10.1869 <= X < 10.7326
Bin around 20 percent - count: 1426 - boundaries: 10.7326 <= X < 11.0965
Bin around 25 percent - count: 1278 - boundaries: 11.0965 <= X < 11.6422
Bin around 30 percent - count: 1203 - boundaries: 11.6422 <= X < 12.006
Bin around 35 percent - count: 1081 - boundaries: 12.006 <= X < 12.5517
Bin around 40 percent - count: 1076 - boundaries: 12.5517 <= X < 12.9155
Bin around 45 percent - count: 907 - boundaries: 12.9155 <= X < 13.4613
Bin around 50 percent - count: 865 - boundaries: 13.4613 <= X < 13.8251
Bin around 55 percent - count: 803 - boundaries: 13.8251 <= X < 14.3708
Bin around 60 percent - count: 683 - boundaries: 14.3708 <= X < 14.7346
Bin around 65 percent - count: 683 - boundaries: 14.7346 <= X < 15.2804
Bin around 70 percent - count: 545 - boundaries: 15.2804 <= X < 15.6442
Bin around 75 percent - count: 534 - boundaries: 15.6442 <= X < 16.1899
Bin around 80 percent - count: 465 - boundaries: 16.1899 <= X < 16.5537
Bin around 85 percent - count: 447 - boundaries: 16.5537 <= X < 17.0995
Bin around 90 percent - count: 357 - boundaries: 17.0995 <= X < 17.4633
Bin around 95 percent - count: 358 - boundaries: 17.4633 <= X < 18.009
Bin around 100 percent - count: 198 - boundaries: 18.009 <= X < 18.3728
Bin around 105 percent - count: 224 - boundaries: 18.3728 <= X < 18.9185
Bin around 110 percent - count: 151 - boundaries: 18.9185 <= X < 19.2824
Bin around 115 percent - count: 102 - boundaries: 19.2824 <= X < 19.8281
Bin around 120 percent - count: 56 - boundaries: 19.8281 <= X < 20.1919
ref_value 9.09545283528363
running_mean0 4.9789 running_min0 0.0 running_max0 19.9924

This approach needs very little memory, basically for a dictionary with some keys for counting relative percentages. And it can give us real time picture of the distribution of values around our reference value. To Do:

  • Need to account for the case where the first value(s) being zero, so we need to count them relative to the reference value.
  • Need to process the relative percentage bins to get comparable size bins eventually.
  • Need to identify a reasonable way of identifying a good normalizing value for percentages

Upvotes: 0

pandayo
pandayo

Reputation: 310

If I understood your question correctly, couldn't you use a default dict to count the appearances of each value? Afterwards you have a huge dict but you could iterate over the sorted keys and create your bins like this? You could even recalculate the bins for every new number in your stream, but I would deem this unneccessary.

To put it into code:

import random
import copy
from math import ceil
from collections import defaultdict
random.seed(0)
count0=0
sum0=0
running_min0=None
running_max0=None

appearances = defaultdict(int)

def get_bin_boundaries(appearances, count, min, max, n_bins=5):
    if count < n_bins:
        n_bins = count
    keys = appearances.keys()
    keys = sorted(keys)
    bin_size = int(ceil(count / n_bins))
    boundries = [min]
    key_index = 0
    removed = 0
    taken = 0
    while len(boundries) <= n_bins :
        cur_val = appearances[keys[key_index]]
        if cur_val - removed > bin_size - taken:
            removed += (bin_size - taken)
            boundries.append(keys[key_index])
            taken = 0
            continue
        if cur_val - removed == bin_size - taken:
            removed = 0
            taken = 0
            boundries.append(keys[key_index]) 
            key_index += 1
            continue
        taken += (cur_val-removed)
        removed = 0
        key_index +=1
        if key_index >= len(keys):
            boundries.append(keys[key_index-1])
    boundries.append(max)
    return boundries


for i in range(1000000):
    cur_number=random.randint(0,20)
    count0+=1
    sum0+=cur_number
    running_mean0=sum0/count0
    if running_min0==None or running_min0>cur_number:running_min0=cur_number
    if running_max0==None or running_max0<cur_number:running_max0=cur_number
    appearances[cur_number]+=1
    if i % 100000 == 0:
        print(i, get_bin_boundaries(appearances, count0, running_min0, running_max0))

keep in mind that this is slow to compute.

Upvotes: 1

Ronald van Elburg
Ronald van Elburg

Reputation: 275

The problem can not be solved exactly, but given some constraints we can try to solve it in good approximation.

It is important to know beforehand in which range the data can almost certainly be found. So an order of magnitude estimate of data values should be known.

Suppose we have data and we know that the majority of the data points is almost certainly in the range [a_min, a_max] then we can:

- bin the data into very narrow bins creating a histogram in the process
- subsequently calculate the the cumulative distribution function 
- find the points where cumulative distribution function reaches the 
percentiles of interest

In code:

import numpy as np

# Function to check if x is power of 2
# https://www.geeksforgeeks.org/python-program-to-find-whether-a-no-is-power-of-two/
def isPowerOfTwo(n):
    if (n == 0):
        return False
    while (n != 1):
            if (n % 2 != 0):
                return False
            n = n // 2
             
    return True


class percentileEstimator():
    ''' This algorithm assumes there are 2^N bins separated bu 2^N-1 bin
         boundaries (where N is a natural number).
         
         We start from the middle bin boundary and search from there a bin
         boundary neighbouring the correct bin. Then we do a last check on
         the value to decide whether the lower neighbouring bin or the higher
         neighbouring bin is the correct bin for the value.
         
         Bin boundary is included in neighbouring bin at higher values.
         The first and last bin contain values before repectively
         after the last specified bin boundary.
           
    '''


    def __init__(self, bin_boundaries):
        
        if not isPowerOfTwo(len(bin_boundaries)+1):
            raise ValueError('percentileEstimator: Number of bins is not a power of 2')
            
        self.bin_boundaries = bin_boundaries
        self.bin_count = len(bin_boundaries) + 1
        self.histogram = np.zeros((self.bin_count,), dtype=np.int64)
        self.datapoint_count = 0   
       
    def getBinIndex(self, value):
       
        position = int(self.bin_count/2)  # For mathematical reasons we start positions at 1
        step = int(self.bin_count/4)

        while (step > 0):
            
            if(value < self.bin_boundaries[position-1]):
                position -= step
            else:
                position += step
            
            step = step//2  #int(step//2)
            
        # Are we lower or higher than the last bin boundary
        if(value < self.bin_boundaries[position-1]):
            index = position-1
        else:
            index = position
        
        return index

    def update(self, data):
        for datapoint in data:
            index = self.getBinIndex(datapoint)
            self.histogram[index] +=1
        print(self.histogram)

    def getPercentiles(self, percentile_list):
        '''
        Calculate approximate percentile location:
        
            In: 
                percentile_list: list percentiles
            
            Out:
                percentiles: estimated value associated with the percentile
                error_intervals: interval in which we are certain the percentile value can be found
        '''
        
        cumulative_distribution = np.cumsum(self.histogram)
        percentile_locations = list()
                
        if cumulative_distribution[0] > 0:
            print(f'There are {cumulative_distribution[0]} data points below the specified minimum')
            
        if cumulative_distribution[-2] != cumulative_distribution[-1]:
            print(f'There are {cumulative_distribution[-1] - cumulative_distribution[-2]} data points above the specified maximum')
            
        for percentile in percentile_list:
            if percentile <= 1/cumulative_distribution[-1]:
                print(f'percentile requested {percentile} is to small for the data set provided, percentile value should be larger than {1/cumulative_distribution[-1]} ')
            elif  percentile >= 1-1/cumulative_distribution[-1]:
                print(f'percentile requested {percentile} is to large for the data set provided, percentile value should be smaller than {1-1/cumulative_distribution[-1]}')
        
        for percentile in percentile_list:
            percentile_loc_left = np.searchsorted(cumulative_distribution[1:-2], percentile*cumulative_distribution[-1], side ='left')
            percentile_locations.append(percentile_loc_left)                             
        
        percentiles = np.array([(self.bin_boundaries[location]+self.bin_boundaries[location+1])/2 for location in percentile_locations])
        
        error_intervals = np.array([[self.bin_boundaries[location], self.bin_boundaries[location+1]] for location in percentile_locations])
                
        return percentiles,  error_intervals
    

# test the class a bit

def test_index_function():
    pE = percentileEstimator(np.array([0, 13, 27], 'float'))
    values = [-1.0, 0, 0.5, 13, 13.5, 27, 27.5, 12.9]
    desired_outcomes = np.array([0, 1, 1, 2, 2, 3, 3, 1])
    actual_outcomes = np.array([pE.getBinIndex(value) for value in values])
    np.testing.assert_equal(actual_outcomes, desired_outcomes)

test_index_function()

def test1_getpercentile_function():
    pE = percentileEstimator(np.array([1,2,3,4,5,6,7], 'float'))
    
    values = np.array([2.4,]*40 + [5.1]*50 + [6.5]*10)
    
    pE.update(values)
    
    percentiles = [0.3,0.4,0.5,0.9]
    percentile_values, error_intervals = pE.getPercentiles(percentiles)
    
    print(f'{percentile_values=}')
    
    percentile_values_expected = np.array([2.5, 2.5, 5.5, 5.5])
    
    error_intervals_expected = np.array([[2., 3.],
       [2., 3.],
       [5., 6.],
       [5., 6.]])
    
    np.testing.assert_equal(percentile_values, percentile_values_expected)
    np.testing.assert_equal(error_intervals, error_intervals_expected)
                   
test1_getpercentile_function() 


a_min = 0
a_max = 10
step = (a_max-a_min)/510

bin_boundaries = np.arange(a_min, a_max+step,step)

pE = percentileEstimator(bin_boundaries)
test_data_mean = 7
test_data_sigma = 0.5
test_data = np.random.randn(1000)*test_data_sigma + test_data_mean


pE.update(test_data)

percentiles, error_intervals = pE.getPercentiles([0.1, 0.5, 0.9,])

print(f'{percentiles=}\n{ error_intervals=}')

For me this produces something like:

percentiles=array([6.30392157, 6.99019608, 7.6372549 ])
error_intervals=array([[6.29411765, 6.31372549],
                       [6.98039216, 7.        ],
                       [7.62745098, 7.64705882]])

This code shows the principle. It can probably be sped up but it is reasonably efficient as is.

Calling pE.getPercentiles with equally spaced percentiles returns the bin boundaries of interest:

N=10
equally_spaced_percentiles = (np.arange(0,N-1)+1)/N

print(f'{equally_spaced_percentiles=}')
percentiles, error_intervals = pE.getPercentiles(equally_spaced_percentiles)

print(f'bin boundaries: {percentiles}')

For the Gaussian example I get (in 1 of the runs):

bin boundaries: [6.30392157 6.53921569 6.69607843 6.85294118 6.99019608 7.14705882
 7.30392157 7.46078431 7.6372549 ]

The internal bins are equally spaced in the example calls. This is good for the example where we apply the code to a data drawn from a Gaussian distribution. If however we deal with a ratio scale, for example dealing with the energy in sound, it is possible to take the logarithm and then use equally spaced bin boundaries. Or alternatively, and in my opinion more efficiently, it is possible to choose the bin boundaries log-linearly and avoid the expensive log-function.

Upvotes: 0

Andre Ahmed
Andre Ahmed

Reputation: 1891

What you need is a spatial data structure, you can use Quadtree, it will subdivide the space into quads, based on distance between the points, you can "tile" all the bins into tiles. You can use any grouping algorithm like Euclidian distance or hamming or min/max to divide the space. After tiling each bin you can find the intersection in N log N by walking through down the tree. You can also query for any point,..etc a lot of features. https://en.wikipedia.org/wiki/Quadtree or easier approach: https://en.wikipedia.org/wiki/K-d_tree

enter image description here

Upvotes: 0

Anas Naguib
Anas Naguib

Reputation: 1116

You should use Python with Apache Spark, doing this operation with python only will consume a lot of time and will not be an efficient way.

https://spark.apache.org/

Another way to try pandas if you need to work with python only. https://pandas.pydata.org/

Upvotes: 0

Mustafa Gamal
Mustafa Gamal

Reputation: 11

I think you will need to sort the stream and you can achieve this (and I am here assuming you know the number of items in the stream and that your memory can handle at least two bins at a time) by doing the following

  1. store each bin into disk [bin_size = number_of_items_in_stream /number_of_bins]

  2. after the end of the stream load each bin into memory and sort it then store it again into disk while saving the name of the bin and it's min and max values in a data structure that contains these values in addition to the name of each bin.

  3. in the data structure sort the bins names according to their min value.

  4. from step 3 you can identify which bins intersect with each other.

  5. loop over the data structure and load every two intersecting bins into memory and interchange their values with each other so that the two bins won't have any intersecting values at the end.

  6. after step 5 update the min and max values of the two bins in the data structure to be equal to the updated min and max values.

  7. your stream is now sorted.

Upvotes: 1

Husam Khankan
Husam Khankan

Reputation: 29

If you know the expected length of input beforehand, it would be pretty easy if I understand you correctly:

import random
random.seed(0)
count0=0
sum0=0
running_min0=None
running_max0=None
len=100000000

def get_bin_boundaries(n_bins=5): #The function I need, it can take any arguments
  res = []
  i = 0
  while i < len:
    res.append(i)
    i += int(len/n_bins)
  res.append(len-1)
  return res#and return a list of boundary values corresponding to n_bins+1 e.g. [0,3,7,9,11,15]

for i in range(len):
  cur_number=random.randint(0,20)
  count0+=1
  sum0+=cur_number
  running_mean0=sum0/count0
  if running_min0==None or running_min0>cur_number:running_min0=cur_number
  if running_max0==None or running_max0<cur_number:running_max0=cur_number
  running_bin_boundaries=get_bin_boundaries() #This is what I need

Upvotes: 0

Related Questions