Reputation: 2234
I have a pyspark dataframe where there are 30 observations per unique ID like so:
id time features
1 0 [1,2,3]
1 1 [4,5,6]
.. .. ..
1 29 [7,8,9]
2 0 [0,1,2]
2 1 [3,4,5]
.. .. ..
2 29 [6,7,8]
.. .. ..
What I need to do is create an array of sequences to feed into a keras neural network. So, for example let's say I have the following smaller dataset for one id:
id time features
1 0 [1,2,3]
1 1 [4,5,6]
1 2 [7,8,9]
The desired data format is:
[[[1,2,3]
[0,0,0]
[0,0,0]],
[[1,2,3],
[4,5,6],
[0,0,0]],
[[1,2,3],
[4,5,6],
[7,8,9]]]
I can use the pad_sequences
function from the keras package to add the [0,0,0] rows so what I really need to be able to do is to create the following array for all ids.
[[[1,2,3]],
[[1,2,3],
[4,5,6]],
[[1,2,3],
[4,5,6],
[7,8,9]]]
The only way I can think to do it is with loops, something like this:
x = []
for i in range(10000):
user = x_train[i]
arr = []
for j in range(30):
arr.append(user[0:j])
x.append(arr)
A loop solution isn't feasible though. I have 904 batches of 10,000 unique ids each with 30 observations. I'm collecting one batch at a time into a numpy array so a numpy solution is fine. A pyspark solution using rdds would be awesome. Something using map
perhaps?
Upvotes: 0
Views: 365
Reputation: 1161
Why don't you do something along these lines:
dict1 = {}
for tuple1 in your_collection:
if tuple1 ['id'] not in dict1:
###if we've never seen the id then add a list of lists of feature lists as entry
dict1 [tuple1['id']] = [[tuple1['features']]]
else:
##if we've seen this ID then take the previous (n-1)
##list of list of features from the current dictionary
##entry, copy its value to a variable, add the current list of
##features to this list of lists and finally append this
##updated list back to the entry (which is essentially
##a 3d matrix). So each entry is a 3d list keyed off by id.
prev_list = dict1[tuple1['id']][-1][:]
prev_list.append ( tuple1['features'])
dict1[tuple1['id']].append (prev_list)
This has some poor space complexity but may work if you are dealing with a set of limited size.
Upvotes: 0
Reputation: 53099
Here is a numpy solution that creates the desired output including zeros.
It uses triu_indices
to create the "cumulative time series structure":
import numpy as np
from timeit import timeit
def time_series(nids, nsteps, features):
f3d = np.reshape(features, (nids, nsteps, -1))
f4d = np.zeros((nids, nsteps, nsteps, f3d.shape[-1]), f3d.dtype)
i, j = np.triu_indices(nsteps)
f4d[:, j, i, :] = f3d[:, i, :]
return f4d
nids = 2
nsteps = 4
nfeatures = 3
features = np.random.randint(1, 100, (nids * nsteps, nfeatures))
print('small example', time_series(nids, nsteps, features))
nids = 10000
nsteps = 30
nfeatures = 3
features = np.random.randint(1, 100, (nids * nsteps, nfeatures))
print('time needed for big example {:6.4f} secs'.format(
timeit(lambda: time_series(nids, nsteps, features), number=10)/10))
output:
small example [[[[76 53 48]
[ 0 0 0]
[ 0 0 0]
[ 0 0 0]]
[[76 53 48]
[46 59 76]
[ 0 0 0]
[ 0 0 0]]
[[76 53 48]
[46 59 76]
[62 39 17]
[ 0 0 0]]
[[76 53 48]
[46 59 76]
[62 39 17]
[61 90 69]]]
[[[68 32 20]
[ 0 0 0]
[ 0 0 0]
[ 0 0 0]]
[[68 32 20]
[47 11 72]
[ 0 0 0]
[ 0 0 0]]
[[68 32 20]
[47 11 72]
[30 3 9]
[ 0 0 0]]
[[68 32 20]
[47 11 72]
[30 3 9]
[28 73 78]]]]
time needed for big example 0.2251 secs
Upvotes: 1