David
David

Reputation: 103

How can I do such transformation?

env: spark2.4.5

source.json:

{
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9",
    ...
}

traget.json:

{
    "factors": [
        {
            "name": "a",
            "key": "1",
            "pros": "2",
            "cons": "3"
        },
        {
            "name": "b",
            "key": "4",
            "pros": "5",
            "cons": "6"
        },
        {
            "name": "c",
            "key": "7",
            "pros": "8",
            "cons": "9"
        },
        ...
    ]
}

As you can see the target 'name' is a part of key of sources. For instance, 'a' is the 'name' of 'a_key', 'a_pro', 'a_con'. I really don't know how to extract a value from key, and do some 'group by' transforming. Can anybody give me some suggestion?

Upvotes: 0

Views: 83

Answers (3)

kai.tian
kai.tian

Reputation: 31

Your data is strange, but the following code can help you solve it:

source.json:

    {
      "a_key": "1",
      "a_pro": "2",
      "a_con": "3",
      "b_key": "4",
      "b_pro": "5",
      "b_con": "6",
      "c_key": "7",
      "c_pro": "8",
      "c_con": "9"
}

code:

val sparkSession = SparkSession.builder()
  .appName("readAndWriteJsonTest")
  .master("local[*]").getOrCreate()

val dataFrame = sparkSession.read.format("json").load("R:\\data\\source.json")

// println(dataFrame.rdd.count())

val mapRdd: RDD[(String, (String, String))] = dataFrame.rdd.map(_.getString(0))
 .filter(_.split("\\:").length == 2)
 .map(line => {
  val Array(key1, value1) = line.split("\\:")
  val Array(name, key2) = key1.replace("\"", "").trim.split("\\_")
  val value2 = value1.replace("\"", "").replace(",", "").trim
  (name, (key2, value2))
})

// mapRdd.collect().foreach(println)

val initVale = new ArrayBuffer[(String, String)]

val function1 = (buffer1: ArrayBuffer[(String, String)], t1: (String, String)) => buffer1.+=(t1)
val function2 = (buffer1: ArrayBuffer[(String, String)], buffer2: ArrayBuffer[(String, String)]) => buffer1.++(buffer2)

val aggRdd: RDD[(String, ArrayBuffer[(String, String)])] = mapRdd.aggregateByKey(initVale)(function1, function2)

// aggRdd.collect().foreach(println)

import scala.collection.JavaConverters._
val persons: util.List[Person] = aggRdd.map(line => {
  val name = line._1
  val keyValue = line._2(0)._2
  val prosValue = line._2(1)._2
  val consvalue = line._2(2)._2

  Person(name, keyValue, prosValue, consvalue)
}).collect().toList.asJava


import com.google.gson.GsonBuilder
val gson = new GsonBuilder().create

val factors = Factors(persons)

val targetJsonStr = gson.toJson(factors)

println(targetJsonStr)

traget.json:

{
  "factors": [
  {
    "name": "a",
    "key": "1",
    "pros": "2",
    "cons": "3"
  },
  {
    "name": "b",
    "key": "4",
    "pros": "5",
    "cons": "6"
  },
  {
    "name": "c",
    "key": "7",
    "pros": "8",
    "cons": "9"
  }
  ]
}

You can put the above code into the test method and run it to see the result you want.

  @Test
  def readAndSaveJsonTest: Unit = {}

Hope it can help you.

Upvotes: 0

rgk
rgk

Reputation: 1015

No need to involve dataframes for this, some simple string and dictionary manipulation will do:

import json

source = {
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9",
}

factors = {}

# Prepare each factor dictionary
for k, v in source.items():
    factor, item = k.split('_')
    d = factors.get(factor, {})
    d[item] = v
    factors[factor] = d

# Prepare result dictionary
target = {
    'factors': []
}

# Move name attribute into dictionary & append
for k, v in factors.items():
    d = v
    d['name'] = k
    target['factors'].append(d)

result = json.dumps(target)

Upvotes: 0

Shubham Jain
Shubham Jain

Reputation: 5526

IIUC first create the dataframe from the input json

json_data = {
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9"
}
df=spark.createDataFrame(list(map(list,json_data.items())),['key','value'])
df.show()

+-----+-----+
|  key|value|
+-----+-----+
|a_key|    1|
|a_pro|    2|
|a_con|    3|
|b_key|    4|
|b_pro|    5|
|b_con|    6|
|c_key|    7|
|c_pro|    8|
|c_con|    9|
+-----+-----+

Now create the required columns from existing column

import pyspark.sql.functions as  f
df2 = df.withColumn('Name', f.substring('key',1,1)).\
         withColumn('Attributes', f.concat(f.split('key','_')[1],f.lit('s')))
df2.show()
+-----+-----+----+----------+
|  key|value|Name|Attributes|
+-----+-----+----+----------+
|a_key|    1|   a|      keys|
|a_pro|    2|   a|      pros|
|a_con|    3|   a|      cons|
|b_key|    4|   b|      keys|
|b_pro|    5|   b|      pros|
|b_con|    6|   b|      cons|
|c_key|    7|   c|      keys|
|c_pro|    8|   c|      pros|
|c_con|    9|   c|      cons|
+-----+-----+----+----------+

Now pivot the dataframe and collect result as json object

output_json = df2.groupBy('Name').\
                  pivot('Attributes').\
                  agg(f.min('value')).\         
                  select(f.collect_list(f.struct('Name','keys','cons','pros')).alias('factors')).\
                  toJSON().collect()

import json
print(json.dumps(json.loads(output_json[0]),indent=4))

{
    "factors": [
        {
            "Name": "c",
            "keys": "7",
            "cons": "9",
            "pros": "8"
        },
        {
            "Name": "b",
            "keys": "4",
            "cons": "6",
            "pros": "5"
        },
        {
            "Name": "a",
            "keys": "1",
            "cons": "3",
            "pros": "2"
        }
    ]
}

Upvotes: 1

Related Questions