shakedzy
shakedzy

Reputation: 2893

Spark SQL computing rows it shouldn't

I have a DataFrame loaded from a parquet file which stores many columns. Two of these are an array of user identifiers and another is the states his visited. The user identifier column is stored as an array (WrappedArray as it's Spark) of arrays, where every sub-array has the identifier type as the first element and its value as the second. For example, a user named Jon Smith with ID 1045 will be stored as: WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045")). (the sub arrays are arrays of Strings).

So the table looks like this:

uid                                                                        | state
---------------------------------------------------------------------------------------
WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045"))  | TX
WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045"))  | ND
WrappedArray(WrappedArray("name","Jane Katz"), WrappedArray("id","1056"))  | IO

and so on. I want a table with the ID of each user and the number of states his been to:

id    |  states
--------------------
1045  |  2
1056  |  1

So for this I've created a new UDF that parses the ID from the uid array, which looks like this:

import scala.collection.mutable     
def IDfromUID(uid: mutable.WrappedArray[mutable.WrappedArray[String]]): String = {
    val ID = uid.filter(_(0) == "id")
    ID.length match {
        case 0 => null
        case _ => ID(0)(1) }
    }

and I use it in the following query:

sqlContext.udf.register("IDfromUID",(uid: String) => IDfromUID(uid))
df.registerTempTable("RawData")
sqlContext.sql("with Data as (select IDfromUID(uid) as id, state from RawData where uid is not null) select id, count(state) as states from Data where id not null group by id")

Yet, despite the fact that I mention where uid is not null, I still get NullPointerException coming from IDfromUID. It stops only when I change the UDF to be:

import scala.collection.mutable      
def IDfromUID(uid: mutable.WrappedArray[mutable.WrappedArray[String]]): String = {
    if (uid==null) null
    else {
      val ID = uid.filter(_(0) == "id")
      ID.length match {
          case 0 => null
          case _ => ID(0)(1) } }
    }

which leaves me with the question - why does Spark tries to compute rows of data it's strictly told not to?

I use Spark 1.6.2, and there are multiple queries running in parallel using the same UDF.

Upvotes: 0

Views: 75

Answers (1)

Tim Biegeleisen
Tim Biegeleisen

Reputation: 521239

I don't know if this answer/note will have any value for you, but I noticed a few things in your query. First, in the following CTE you already filter out NULL id values:

WITH Data AS
(
    SELECT IDfromUID(uid) AS id, state
    FROM RawData
    WHERE uid IS NOT NULL
)

I also noticed that there seemed to be a typo in your query:

select id, count(state) as states from Data where id id not null group by id")

The typo being where id id not null. I think you can remove this WHERE clause entirely:

SELECT id, count(state) AS states
FROM Data
GROUP BY id

Upvotes: 1

Related Questions