Reputation: 2893
I have a DataFrame
loaded from a parquet file which stores many columns. Two of these are an array of user identifiers and another is the states his visited. The user identifier column is stored as an array (WrappedArray
as it's Spark) of arrays, where every sub-array has the identifier type as the first element and its value as the second. For example, a user named Jon Smith with ID 1045 will be stored as: WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045"))
. (the sub arrays are arrays of String
s).
So the table looks like this:
uid | state
---------------------------------------------------------------------------------------
WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045")) | TX
WrappedArray(WrappedArray("name","Jon Smith"), WrappedArray("id","1045")) | ND
WrappedArray(WrappedArray("name","Jane Katz"), WrappedArray("id","1056")) | IO
and so on. I want a table with the ID of each user and the number of states his been to:
id | states
--------------------
1045 | 2
1056 | 1
So for this I've created a new UDF that parses the ID from the uid
array, which looks like this:
import scala.collection.mutable
def IDfromUID(uid: mutable.WrappedArray[mutable.WrappedArray[String]]): String = {
val ID = uid.filter(_(0) == "id")
ID.length match {
case 0 => null
case _ => ID(0)(1) }
}
and I use it in the following query:
sqlContext.udf.register("IDfromUID",(uid: String) => IDfromUID(uid))
df.registerTempTable("RawData")
sqlContext.sql("with Data as (select IDfromUID(uid) as id, state from RawData where uid is not null) select id, count(state) as states from Data where id not null group by id")
Yet, despite the fact that I mention where uid is not null
, I still get NullPointerException
coming from IDfromUID
. It stops only when I change the UDF to be:
import scala.collection.mutable
def IDfromUID(uid: mutable.WrappedArray[mutable.WrappedArray[String]]): String = {
if (uid==null) null
else {
val ID = uid.filter(_(0) == "id")
ID.length match {
case 0 => null
case _ => ID(0)(1) } }
}
which leaves me with the question - why does Spark tries to compute rows of data it's strictly told not to?
I use Spark 1.6.2, and there are multiple queries running in parallel using the same UDF.
Upvotes: 0
Views: 75
Reputation: 521239
I don't know if this answer/note will have any value for you, but I noticed a few things in your query. First, in the following CTE you already filter out NULL
id values:
WITH Data AS
(
SELECT IDfromUID(uid) AS id, state
FROM RawData
WHERE uid IS NOT NULL
)
I also noticed that there seemed to be a typo in your query:
select id, count(state) as states from Data where id id not null group by id")
The typo being where id id not null
. I think you can remove this WHERE
clause entirely:
SELECT id, count(state) AS states
FROM Data
GROUP BY id
Upvotes: 1