nest
nest

Reputation: 347

Migrating specific SQL query to Spark's UDF

I'm migrating the following function to a function to sql udf spark.

DROP FUNCTION IF EXISTS anyarray_enumerate(anyarray);
CREATE FUNCTION anyarray_enumerate(anyarray)
RETURNS TABLE (index bigint, value anyelement) AS
$$
SELECT
    row_number() OVER (),
    value
FROM (
    SELECT unnest($1) AS value
) AS unnested
$$
LANGUAGE sql IMMUTABLE;

I do not get that spark sql output is similar to that obtained in SQL. Any help or idea?

demo=# select anyarray_enumerate(array[599,322,119,537]);
anyarray_enumerate
--------------------
 (1,599)
 (2,322)
 (3,119)
 (4,537)
 (4 rows)

My current code is:

def anyarray_enumerate[T](anyarray: WrappedArray[T]) = anyarray.zipWithIndex   
// Registers a function as a UDF so it can be used in SQL statements.
sqlContext.udf.register("anyarray_enumerate", anyarray_enumerate(_:WrappedArray[Int]))

Thank you

Upvotes: 1

Views: 93

Answers (1)

Matt Zimmerman
Matt Zimmerman

Reputation: 91

Your UDF returns the whole array of tuples in a single row:

spark.sql("select anyarray_enumerate(array(599, 322, 119, 537)) as foo").show()
+--------------------+
|                 foo|
+--------------------+
|[[599,0], [322,1]...|
+--------------------+

but you can use the explode() function to split that into multiple rows:

spark.sql("select explode(anyarray_enumerate(array(599, 322, 119, 537))) as foo").show()
+-------+
|    foo|
+-------+
|[599,0]|
|[322,1]|
|[119,2]|
|[537,3]|
+-------+

Also, the zipWithIndex method returns the value first and index second, unlike your SQL command, but that's easily fixed in the UDF.

Upvotes: 1

Related Questions