crystyxn
crystyxn

Reputation: 1601

Retrieving nested column in python spark dataframe

Part of my df schema:

-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- error: string (nullable = true)
 |    |    |-- hop: long (nullable = true)
 |    |    |-- resuLt: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |-- result: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Rtt: double (nullable = true)
 |    |    |    |    |-- Ttl: long (nullable = true)
 |    |    |    |    |-- dstoptsize: long (nullable = true)
 |    |    |    |    |-- dup: boolean (nullable = true)
 |    |    |    |    |-- edst: string (nullable = true)
 |    |    |    |    |-- err: string (nullable = true)
 |    |    |    |    |-- error: string (nullable = true)
 |    |    |    |    |-- flags: string (nullable = true)
 |    |    |    |    |-- from: string (nullable = true)
 |    |    |    |    |-- hdropts: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- mss: long (nullable = true)
 |    |    |    |    |-- icmpext: struct (nullable = true)
 |    |    |    |    |    |-- obj: array (nullable = true)
 |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |-- class: long (nullable = true)
 |    |    |    |    |    |    |    |-- mpls: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |-- exp: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- label: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- s: long (nullable = true)
 |    |    |    |    |    |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |    |    |    |    |    |-- type: long (nullable = true)
 |    |    |    |    |    |-- rfc4884: long (nullable = true)
 |    |    |    |    |    |-- version: long (nullable = true)
 |    |    |    |    |-- itos: long (nullable = true)
 |    |    |    |    |-- ittl: long (nullable = true)
 |    |    |    |    |-- late: long (nullable = true)
 |    |    |    |    |-- mtu: long (nullable = true)
 |    |    |    |    |-- rtt: double (nullable = true)
 |    |    |    |    |-- sIze: long (nullable = true)
 |    |    |    |    |-- size: long (nullable = true)
 |    |    |    |    |-- tos: long (nullable = true)
 |    |    |    |    |-- ttl: long (nullable = true)
 |    |    |    |    |-- x: string (nullable = true)

How can I query nested column, like result.result.dstopsize for example? I would like to be able to display everything from result or even result.result or result.resuLt (case-sensitive is on in my spark config)

When I try:

file_df.select("result.resuLt.dstopsize").show(10)

I get this error:

cannot resolve '`result`.`resuLt`['dstopsize']' due to data type mismatch: argument 2 requires integral type, however, ''dstopsize'' is of string type.;;

EDIT: here is some sample data

|_corrupt_record| af|       dst_addr|       dst_name|   endtime|         from|  fw|group_id|lts|  msm_id|  msm_name|paris_id|prb_id|proto|              result|size|     src_addr| timestamp| ttr|      type|
+---------------+---+---------------+---------------+----------+-------------+----+--------+---+--------+----------+--------+------+-----+--------------------+----+-------------+----------+----+----------+
|           null|  4|213.133.109.134|213.133.109.134|1551658584|78.197.253.14|4940|    null| 71|    5019|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658577|null|traceroute|
|           null|  4|   37.143.33.15|   37.143.33.15|1551658584|78.197.253.14|4940|15254159| 71|15254159|Traceroute|      12| 13230| ICMP|[[, 1,, [[,,,,,,,...|  48|192.168.0.130|1551658583|null|traceroute|
|           null|  4|  139.162.27.28|  139.162.27.28|1551658612|78.197.253.14|4940|    null| 20|    5027|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658606|null|traceroute|
|           null|  4|    45.33.72.12|    45.33.72.12|1551658610|78.197.253.14|4940|    null| 18|    5029|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658608|null|traceroute|
|           null|  4|104.237.152.132|104.237.152.132|1551658615|78.197.253.14|4940|    null| 23|    5028|Traceroute|       3| 13230|  UDP|[[, 1,, [[,,,,,,,...|  40|192.168.0.130|1551658608|null|traceroute|
|           null|  4|  94.126.208.18|  94.126.208.18|1551658516|37.14.215.183|4940| 9183324| 20| 9183324|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658439|null|traceroute|
|           null|  4|196.192.112.244|196.192.112.244|1551658554|37.14.215.183|4940| 9181461| 25| 9181461|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658474|null|traceroute|
|           null|  4|    46.234.34.8|    46.234.34.8|1551658539|37.14.215.183|4940| 9180758| 10| 9180758|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658479|null|traceroute|
|           null|  4|    185.2.64.76|    185.2.64.76|1551658560|37.14.215.183|4940| 9181290| 31| 9181290|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658511|null|traceroute|
|           null|  4|  208.80.155.69|  208.80.155.69|1551658597|37.14.215.183|4940| 9183716|  8| 9183716|Traceroute|      15| 11958| ICMP|[[, 1,, [[,,,,,,,...|  48| 192.168.22.2|1551658546|null|traceroute|
+---------------+---+---------------+---------------+----------+-------------+----+--------+---+--------+----------+--------+------+-----+--------------------+----+-------------+----------+----+----------+```

Upvotes: 2

Views: 1614

Answers (2)

D3V
D3V

Reputation: 1593

The result is of array type so you will need to explode or explode_outer and then access whatever you need to access.

from pyspark.sql.functions import explode_outer, col
file_df.withColumn("exploded_result", explode_outer(col("result")))
       .select("exploded_result.resuLt.dstopsize").show(10)

Be cautious though, you will have multiple rows corresponding to individual row depending on number of elements.

Upvotes: 2

Michel Lemay
Michel Lemay

Reputation: 2094

This is atrociously ugly but that would return the correct answer given that you know which column indexes are your nested fields. That can be computed based on the schema.

case class C(x: String, dstoptsize: Long, y: String)
case class B(result: Array[C])
case class A(result: Array[B])

val df = List(
    A(Array(
      B(Array(C("x10", 10, "y10"), C("x11", 11, "y11"))),
      B(Array(C("x12", 12, "y12"), C("x13", 13, "y13")))
      )), 
    A(Array(B(Array(C("x20", 20, "y20"), C("x21", 21, "y21")))))).toDF


val selectInner = udf((x: Seq[Row]) => { x.map(_.getSeq[Row](0).map(_.getLong(1))) })

df.select(selectInner($"result")).show

+--------------------+
|         UDF(result)|
+--------------------+
|[[10, 11], [12, 13]]|
|          [[20, 21]]|
+--------------------+

Upvotes: 0

Related Questions