user3448011
user3448011

Reputation: 1599

error of finding max value index from pyspark dataframe vector column

I would like to find the index of the max value in a vector column of spark dataframe by pyspark.

my spark is

   3.0.0

The df :

  id   val  (vector (nullable = true))
  516   0: 1 1: 10 2: [] 3:[0.162, 0.511, 0.022, ....]

Is this a sparse vector ? How to access the array ?

  [0.162, 0.511, 0.022, ....]

base on How to find the index of the maximum value in a vector column?, How to get the index of the highest value in a list per row in a Spark DataFrame? [PySpark], How to find the argmax of a vector in PySpark ML

it looks like a dense vector ? My code:

   import pyspark.sql.functions as F
   from pyspark.ml.functions import vector_to_array
   from pyspark.sql.types import IntegerType
   from pyspark.sql.functions import vector_to_array

   def max_index(a_col):
       if not a_col:
          return a_col
       if isinstance(a_col, SparseVector):
          a_col = DenseVector(a_col)
       a_col = vector_to_array(a_col)
       return np.argmax(a_col)

   my_f = F.udf(max_index, IntegerType())

   t = df.withColumn("max_index_col", my_f("val")) # this returned a None type because ""max_index" did not work.

   t.show()

error:

  AttributeError: 'NoneType' object has no attribute '_jvm'

I have tried all solutions mentioned in the above links. But, none of them work.

Did I missed something ?

thanks

UPDATE, I also tried:

 vec_to_array = F.udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

 def find_max_index(v):
     return F.array_position(v, F.array_max(v))
 
 t = df.withColumn("array_col", vec_to_array(F.col("features"))) 
 t.withColumn("max_index", find_max_index(F.col("array_col"))).show(truncate=False)

the same error.

Upvotes: 3

Views: 865

Answers (1)

werner
werner

Reputation: 14845

For Spark >= 3.0.0 vector_to_array can be used to transform the vector into an array. Then the index of the maximum value can be found with an sql expression:

from pyspark.ml.functions import vector_to_array

df.withColumn("array", vector_to_array("vector")) \
 .withColumn("max_index_col", F.expr("array_position(array,array_max(array))")) \
 .drop("array") \
 .show()

Upvotes: 3

Related Questions