Reputation: 1752
In scala+spark, I'm having a dataframe of two columns of Array[String]
scala> val test = spark.sqlContext.read.json(sc.parallelize(Seq("""{"v1":["A", "B", "C"],"v2":["ok", "false", "ok"]}""", """{"v1":["D", "E"],"v2":["false", "ok"]}""")))
test: org.apache.spark.sql.DataFrame = [v1: array<string>, v2: array<string>]
scala> test.show
+---------+---------------+
| v1| v2|
+---------+---------------+
|[A, B, C]|[ok, false, ok]|
| [D, E]| [false, ok]|
+---------+---------------+
scala> test.printSchema
root
|-- v1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- v2: array (nullable = true)
| |-- element: string (containsNull = true)
I would like to filter the element in v1
based on the value of the corresponding index in v2
I zipped the two columns to try filtering it but I don't see how to do it with .filter()
on a Row
of Array[String]
scala> val result = test.withColumn("result", arrays_zip($"v1", $"v2")).select("result")
result: org.apache.spark.sql.DataFrame = [result: array<struct<v1:string,v2:string>>]
scala> result.printSchema
root
|-- result: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- v1: string (nullable = true)
| | |-- v2: string (nullable = true)
scala> result.show
+--------------------+
| result|
+--------------------+
|[[A, ok], [B, fal...|
|[[D, false], [E, ...|
+--------------------+
Ideally, at the end, I want an row of value like where I'll count, sort and distinct values.
+------+
|result|
+------+
| A|
| C|
| E|
+------+
I have 19milions row in the columns and each row array has a length around a thousand so I wanted for performance issue to mostly use spark functions and avoid UDF if possible.
Upvotes: 1
Views: 145
Reputation: 384
I think you are almost there. You can apply explode()
to zipped columns and then filter them on required codition. Below code will give you rows with v2 = ok
, on which you can execute count, sort, distinct etc.
scala> val result = test.withColumn("result", explode(arrays_zip($"v1", $"v2"))).select("result")
result: org.apache.spark.sql.DataFrame = [result: struct<v1: string, v2: string>]
scala> result.show(false)
+----------+
|result |
+----------+
|[A, ok] |
|[B, false]|
|[C, ok] |
|[D, false]|
|[E, ok] |
+----------+
scala> val data = result.filter(col("result.v2").equalTo("ok")).select(col("result.v1"))
data: org.apache.spark.sql.DataFrame = [v1: string]
scala> data.show
+---+
| v1|
+---+
| A|
| C|
| E|
+---+
Upvotes: 3