Reputation: 2490
I’m doing some log analysis and examining the length of a queue every few minutes. I know when the files entered the “queue”(a simple filesystem directory) and when they left. With that, I can plot the length of the queue at given intervals. So far so good, though the code is a bit procedural:
ts = pd.date_range(start='2012-12-05 10:15:00', end='2012-12-05 15:45', freq='5t')
tmpdf = df.copy()
for d in ts:
tmpdf[d] = (tmpdf.date_in < d)&(tmpdf.date_out > d)
queue_length = tmpdf[list(ts)].apply(func=np.sum)
But, I want to compare the real length with the length at a given consumption rate(e.g. 1 per second, etc...). I can’t just subtract a constant because the queue can’t go beyond zero.
I have done it, but at a very procedural way. I have tried to use pandas window functions with little success, because can’t access the result that’s already been calculated for the previous element. This was the first thing I tried which is deadly wrong:
imagenes_min = 60 * imagenes_sec
def roll(window_vals):
return max(0.0, window_vals[-1] + window_vals[-2] - imagenes_min)
pd.rolling_apply(arg=imagenes_introducidas, func=roll , window = 2, min_periods=2)
The real code is like this, which I think its too verbose and slow:
imagenes_sec = 1.05
imagenes_min = imagenes_sec * 60 *5
imagenes_introducidas = df3.aet.resample(rule='5t',how='count')
imagenes_introducidas.head()
def accum_minus(serie, rate):
acc = 0
retval = np.zeros(len(serie))
for i,a in enumerate(serie.values):
acc = max(0, a + acc - rate)
retval[i] = acc
return Series(data=retval, index=serie.index)
est_1 = accum_minus(imagenes_introducidas, imagenes_min)
comparativa = DataFrame(data = { 'real': queue_length, 'est_1_sec': est_1 })
comparativa.plot()
This seems an easy task but I don’t know how to do it properly. May be pandas isn’t the tool but some numpy or scipy magic.
UPDATE: df3 is like this(some columns ommited):
aet date_out
date_in
2012-12-05 10:08:59.318600 Z2XG17 2012-12-05 10:09:37.172300
2012-12-05 10:08:59.451300 Z2XG17 2012-12-05 10:09:38.048800
2012-12-05 10:08:59.587400 Z2XG17 2012-12-05 10:09:39.044100
UPDATE 2: This seems faster, still not very elegant
imagenes_sec = 1.05
imagenes_min = imagenes_sec * 60 *5
imagenes_introducidas = df3.aet.resample(rule='5t',how='count')
def add_or_zero(x, y):
return max(0.0, x + y - imagenes_min)
v_add_or_zero = np.frompyfunc(add_or_zero, 2,1)
xx = v_add_or_zero.accumulate(imagenes_introducidas.values, dtype=np.object)
dd = DataFrame(data = {'est_1_sec' : xx, 'real': queue_length}, index=imagenes_introducidas.index)
dd.plot()
Upvotes: 1
Views: 918
Reputation: 49886
How about interleaving inbound and outbound events into a single frame?
In [15]: df
Out[15]:
date_in aet date_out
0 2012-12-05 10:08:59.318600 Z2XG17 2012-12-05 10:09:37.172300
1 2012-12-05 10:08:59.451300 Z2XG17 2012-12-05 10:09:38.048800
2 2012-12-05 10:08:59.587400 Z2XG17 2012-12-05 10:09:39.044100
In [16]: inbnd = pd.DataFrame({'event': 1}, index=df.date_in)
In [17]: outbnd = pd.DataFrame({'event': -1}, index=df.date_out)
In [18]: real_stream = pd.concat([inbnd, outbnd]).sort()
In [19]: real_stream
Out[19]:
event
date
2012-12-05 10:08:59.318600 1
2012-12-05 10:08:59.451300 1
2012-12-05 10:08:59.587400 1
2012-12-05 10:09:37.172300 -1
2012-12-05 10:09:38.048800 -1
2012-12-05 10:09:39.044100 -1
In this format (one decrement for every increment), queue depth can easily be computed with cumsum().
In [20]: real_stream['depth'] = real_stream.event.cumsum()
In [21]: real_stream
Out[21]:
event depth
date
2012-12-05 10:08:59.318600 1 1
2012-12-05 10:08:59.451300 1 2
2012-12-05 10:08:59.587400 1 3
2012-12-05 10:09:37.172300 -1 2
2012-12-05 10:09:38.048800 -1 1
2012-12-05 10:09:39.044100 -1 0
To simulate different consumption rates, replace all real outbound timestamps with a manufactured series of outbound timestamps at a fixed frequency. Since cumsum() function won't work in this case, I created a counting function that takes a floor value.
In [53]: outbnd_1s = pd.DataFrame({'event': -1},
....: index=real_stream.event.resample("S").index)
In [54]: fixed_stream = pd.concat([inbnd, outbnd_1s]).sort()
In [55]: def make_floor_counter(floor):
....: count = [0]
....: def process(n):
....: count[0] += n
....: if count[0] < floor
....: count[0] = floor
....: return count[0]
....: return process
....:
In [56]: fixed_stream['depth'] = fixed_stream.event.map(make_floor_counter(0))
In [57]: fixed_stream.head(8)
Out[57]:
event depth
2012-12-05 10:08:59 -1 0
2012-12-05 10:08:59.318600 1 1
2012-12-05 10:08:59.451300 1 2
2012-12-05 10:08:59.587400 1 3
2012-12-05 10:09:00 -1 2
2012-12-05 10:09:01 -1 1
2012-12-05 10:09:02 -1 0
2012-12-05 10:09:03 -1 0
Upvotes: 2