Geoffrey Anderson
Geoffrey Anderson

Reputation: 1574

How to convert numpy array elements to spark RDD column values

I'm getting ready to use the built-in CSV printing facility of the spark dataframe (not pandas). I have an IndexedRowMatrix already built. As such there are sparse array columns in it. The conversion to dense array is performed by the x.vector.toArray() map call (below). I cannot seem to figure out, how to get the dense numpy array elements into individual columns of a Spark DataFrame. (I am not introducing pandas please.) How to get this RDD into 7 column dataframe, consisting of a string column and six integer columns? My code so far:

X = CoordinateMatrix(sc.parallelize(entries)) 
Xirm = X.toIndexedRowMatrix()
colnames = "username," + ','.join(str(cell) for cell in itemids.keys())  # Make CSV header line
# Might need this for speed: Arrow:  https://bryancutler.github.io/createDataFrame/  See above conf=...
XX = Xirm.rows.map(lambda x: (lu[x.index], x.vector.toArray())) # ?
print(XX.take(2))
df = XX.toDF() #TypeError: Can not infer schema for type: <class 'numpy.ndarray'>
#df.write.csv(header=colnames, path=out_filename)

Here is the take(2) to see an example of the data:

[('kygiacomo', array([ 0.,  1.,  0.,  0.,  0.,  0.])), ('namohysip', array([ 1.,  0.,  0.,  0.,  0.,  0.]))]

See the problem is the RDD tuple has 2 columns, but I need 7 columns in a DataFrame. The number of columns is dynamically determined and I have the column names in the colnames variable but I don't know how to feed that in either. Again, the goal is to output a CSV file "equivalent" (many partial files is OK) by using spark's built-in CSV writing function of the DAtaFrame. (Spark 2.3.0 is resident.) The reals will be converted to ints ideally, and no quotes surrounding any data values. But the 2-to-7 column conversion is the really tough problem here at the moment. Thanks for tips.

Upvotes: 0

Views: 1417

Answers (1)

user9926841
user9926841

Reputation: 11

A simple conversion to plain Python types and unpacking should do the trick:

Xirm.rows.map(lambda x: (lu[x.index], *x.vector.toArray().tolist()))

same as

Xirm.rows.map(lambda x: [lu[x.index]] + x.vector.toArray().tolist())

Upvotes: 1

Related Questions