Ak040
Ak040

Reputation: 171

splitting contents of a dataframe column using Spark 1.4 for nested json data

I am having issues with splitting contents of a dataframe column using Spark 1.4. The dataframe was created by reading a nested complex json file. I used df.explode but keep getting error message. The json file format is as follows:

[   
    {   
        "neid":{  }, 
        "mi":{   
            "mts":"20100609071500Z", 
            "gp":"900", 
            "tMOID":"Aal2Ap", 
            "mt":[  ], 
            "mv":[   
                {   
                    "moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q", 
                    "r": 
                    [ 
                     1, 
                     2, 
                     5 
                     ] 
                }, 
                { 
                    "moid":"ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1542q", 
                    "r": 
                    [ 
                     1, 
                     2, 
                     5 
                     ] 
 } 
            ] 
        } 
    }, 
    {   
        "neid":{   
            "neun":"RC003", 
            "nedn":"SubNetwork=ONRM_RootMo_R,SubNetwork=RC003,MeContext=RC003", 
            "nesw":"CP90831_R9YC/11" 
        }, 
        "mi":{   
            "mts":"20100609071500Z", 
            "gp":"900", 
            "tMOID":"PlugInUnit", 
            "mt":"pmProcessorLoad", 
            "mv":[   
                {   
                    "moid":"ManagedElement=1,Equipment=1,Subrack=MS,Slot=6,PlugInUnit=1", 
                   "r": 
                     [ 1, 2, 5 
                     ] 
                }, 
                {   
                    "moid":"ManagedElement=1,Equipment=1,Subrack=ES-1,Slot=1,PlugInUnit=1", 
                   "r": 
                  [ 1, 2, 5 
                     ] 
                } 
            ] 
        } 
    } 
]

I used following code to load in Spark 1.4

scala> val df = sqlContext.read.json("/Users/xx/target/statsfile.json") 

scala> df.show() 
+--------------------+--------------------+ 
|                  mi|                neid| 
+--------------------+--------------------+ 
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...| 
|[900,["pmIcmpInEr...|[SubNetwork=ONRM_...| 
|[900,pmUnsuccessf...|[SubNetwork=ONRM_...| 
|[900,["pmBwErrBlo...|[SubNetwork=ONRM_...| 
|[900,["pmSctpStat...|[SubNetwork=ONRM_...| 
|[900,["pmLinkInSe...|[SubNetwork=ONRM_...| 
|[900,["pmGrFc","p...|[SubNetwork=ONRM_...| 
|[900,["pmReceived...|[SubNetwork=ONRM_...| 
|[900,["pmIvIma","...|[SubNetwork=ONRM_...| 
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...| 
|[900,["pmEs","pmS...|[SubNetwork=ONRM_...| 
|[900,["pmExisOrig...|[SubNetwork=ONRM_...| 
|[900,["pmHDelayVa...|[SubNetwork=ONRM_...| 
|[900,["pmReceived...|[SubNetwork=ONRM_...| 
|[900,["pmReceived...|[SubNetwork=ONRM_...| 
|[900,["pmAverageR...|[SubNetwork=ONRM_...| 
|[900,["pmDchFrame...|[SubNetwork=ONRM_...| 
|[900,["pmReceived...|[SubNetwork=ONRM_...| 
|[900,["pmNegative...|[SubNetwork=ONRM_...| 
|[900,["pmUsedTbsQ...|[SubNetwork=ONRM_...| 
+--------------------+--------------------+ 
scala> df.printSchema() 
root 
 |-- mi: struct (nullable = true) 
 |    |-- gp: long (nullable = true) 
 |    |-- mt: string (nullable = true) 
 |    |-- mts: string (nullable = true) 
 |    |-- mv: string (nullable = true) 
 |-- neid: struct (nullable = true) 
 |    |-- nedn: string (nullable = true) 
 |    |-- nesw: string (nullable = true) 
 |    |-- neun: string (nullable = true) 

scala> val df1=df.select("mi.mv").show() 
+--------------------+ 
|                  mv| 
+--------------------+ 
|[{"r":[0,0,0],"mo...| 
|{"r":[0,4,0,4],"m...| 
|{"r":5,"moid":"Ma...| 
|[{"r":[2147483647...| 
|{"r":[225,1112986...| 
|[{"r":[83250,0,0,...| 
|[{"r":[1,2,529982...| 
|[{"r":[26998564,0...| 
|[{"r":[0,0,0,0,0,...| 
|[{"r":[0,0,0],"mo...| 
|[{"r":[0,0,0],"mo...| 
|{"r":[0,0,0,0,0,0...| 
|{"r":[0,0,1],"moi...| 
|{"r":[4587,4587],...| 
|[{"r":[180,180],"...| 
|[{"r":["0,0,0,0,0...| 
|{"r":[0,35101,0,0...| 
|[{"r":["0,0,0,0,0...| 
|[{"r":[0,1558],"m...| 
|[{"r":["7484,4870...| 
+--------------------+ 

scala> df1.explode("mv","mvnew")(mv: String => mv.split(",")) 
<console>:1: error: ')' expected but '(' found. 
       df1.explode("mv","mvnew")(mv: String => mv.split(",")) 
                                                       ^ 
<console>:1: error: ';' expected but ')' found. 
       df1.explode("mv","mvnew")(mv: String => mv.split(",")) 

Am i doing something wrong? I need to extract data under mi.mv in separate columns so i can apply some transformations.

Upvotes: 1

Views: 6730

Answers (2)

Dean
Dean

Reputation: 309

I know this is old but I have a solution that made be useful to someone who is searching for a solution to this problem (as I was). I have been using spark 1.5 built with scala 2.10.4.

It appears to just be a format thing. I was replicating all of the errors above and what worked for me was

df1.explode("mv","mvnew"){mv: String => mv.asInstanceOf[String].split(",")} 

I don't entirely understand why I need to define mv as a string twice and if anyone would care to explain that, I'd be interested, but this should enable someone to explode a dataframe column.

One more gotcha. If you are splitting on a special character (say a "?") you need to escape it twice. So in the above, splitting on a "?" would give:

df1.explode("mv","mvnew"){mv: String => mv.asInstanceOf[String].split("\\?")} 

I hope this helps someone somewhere.

Upvotes: 2

bjfletcher
bjfletcher

Reputation: 11508

Remove the String typing of mv like so:

df1.explode("mv","mvnew")(mv => mv.split(","))

because the typing is already in the explode definition.

Update (see comment)

Then you get a different error, where df1 is of type Unit not DataFrame. You can fix this as follows:

val df1=df.select("mi.mv")
df1.show()

df1.explode...

That's because show() returns a value of type Unit which you previously attempted to run explode on. The above ensures that you run explode on the actual DataFrame.

Upvotes: 1

Related Questions