Krzysztof Fajst
Krzysztof Fajst

Reputation: 161

UDF: 'TypeError: 'int' object is not iterable'

I'm converting Scala code to Python. Scala code is using UDF.

def getVectors(searchTermsToProcessWithTokens: Dataset[Person]): Dataset[Person] = {

    import searchTermsToProcessWithTokens.sparkSession.implicits._

    def addVectors(
      tokensToSearchFor: String,
      tokensToSearchIn: String
    ): Seq[Int] = {
      tokensToSearchFor.map(token => if (tokensToSearchIn.contains(token)) 1 else 0)
    }

    val addVectorsUdf: UserDefinedFunction = udf(addVectors _)

    searchTermsToProcessWithTokens
      .withColumn("search_term_vector", addVectorsUdf($"name", $"age"))
      .withColumn("keyword_text_vector", addVectorsUdf($"name", $"age"))
      .as[Person]
     }

My Python conversion:

def getVectors(searchTermsToProcessWithTokens): 

    def addVectors(tokensToSearchFor: str, tokensToSearchIn: str): 
      
      tokensToSearchFor = [1 if (token in tokensToSearchIn) else 0 for token in tokensToSearchIn]
      
      return tokensToSearchFor 
      
    addVectorsUdf= udf(addVectors, ArrayType(StringType()))

    TokenizedSearchTerm = searchTermsToProcessWithTokens \
      .withColumn("search_term_vector", addVectorsUdf(col("name"), col("age"))) \
      .withColumn("keyword_text_vector", addVectorsUdf(col("name"), col("age")))
      
      
    return TokenizedSearchTerm

Defining simple Dataset in Scala like

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()
// +------+---+
// |  name|age|
// +------+---+
// |   Max| 33|
// |  Adam| 32|
// |Muller| 62|
// +------+---+

I'm getting output from Scala function

val x= getVectors(personDS)

x.show()
// +------+---+------------------+-------------------+
// |  name|age|search_term_vector|keyword_text_vector|
// +------+---+------------------+-------------------+
// |   Max| 33|         [0, 0, 0]|          [0, 0, 0]|
// |  Adam| 32|      [0, 0, 0, 0]|       [0, 0, 0, 0]|
// |Muller| 62|[0, 0, 0, 0, 0, 0]| [0, 0, 0, 0, 0, 0]|
// +------+---+------------------+-------------------+

But for the same defined PySpark DataFrame

%python
personDF = spark.createDataFrame([["Max", 32], ["Adam", 33], ["Muller", 62]], ['name', 'age'])

+------+---+
|  name|age|
+------+---+
|   Max| 32|
|  Adam| 33|
|Muller| 62|
+------+---+

I'm getting from Python version

An exception was thrown from a UDF: 'TypeError: 'int' object is not iterable'

What it is wrong with this conversion?

Upvotes: 1

Views: 372

Answers (1)

ZygD
ZygD

Reputation: 24356

It's because your tokensToSearchIn is integer, while it should be string. The following works:

def getVectors(searchTermsToProcessWithTokens): 
    def addVectors(tokensToSearchFor: str, tokensToSearchIn: str): 
      tokensToSearchFor = [1 if token in str(tokensToSearchIn) else 0 for token in tokensToSearchFor]
      return tokensToSearchFor
    addVectorsUdf = udf(addVectors, ArrayType(StringType()))

    TokenizedSearchTerm = searchTermsToProcessWithTokens \
      .withColumn("search_term_vector", addVectorsUdf(col("name"), col("age"))) \
      .withColumn("keyword_text_vector", addVectorsUdf(col("name"), col("age")))

    return TokenizedSearchTerm

For the sake of curiosity, you don't need a UDF. But it doesn't look much simpler...

def getVectors(searchTermsToProcessWithTokens): 
    def addVectors(tokensToSearchFor: str, tokensToSearchIn: str): 
      def arr(s: str):
        return F.split(F.col(s), '(?!$)')
      return transform(arr(tokensToSearchFor), lambda token: when(array_contains(arr(tokensToSearchIn), token), 1).otherwise(0))

    TokenizedSearchTerm = searchTermsToProcessWithTokens \
      .withColumn("search_term_vector", addVectors("name", "age")) \
      .withColumn("keyword_text_vector", addVectors("name", "age"))

    return TokenizedSearchTerm

Upvotes: 2

Related Questions