Reputation: 103
env: spark2.4.5
{
"a_key": "1",
"a_pro": "2",
"a_con": "3",
"b_key": "4",
"b_pro": "5",
"b_con": "6",
"c_key": "7",
"c_pro": "8",
"c_con": "9",
...
}
{
"factors": [
{
"name": "a",
"key": "1",
"pros": "2",
"cons": "3"
},
{
"name": "b",
"key": "4",
"pros": "5",
"cons": "6"
},
{
"name": "c",
"key": "7",
"pros": "8",
"cons": "9"
},
...
]
}
As you can see the target 'name' is a part of key of sources. For instance, 'a' is the 'name' of 'a_key', 'a_pro', 'a_con'. I really don't know how to extract a value from key, and do some 'group by' transforming. Can anybody give me some suggestion?
Upvotes: 0
Views: 83
Reputation: 31
Your data is strange, but the following code can help you solve it:
source.json:
{
"a_key": "1",
"a_pro": "2",
"a_con": "3",
"b_key": "4",
"b_pro": "5",
"b_con": "6",
"c_key": "7",
"c_pro": "8",
"c_con": "9"
}
code:
val sparkSession = SparkSession.builder()
.appName("readAndWriteJsonTest")
.master("local[*]").getOrCreate()
val dataFrame = sparkSession.read.format("json").load("R:\\data\\source.json")
// println(dataFrame.rdd.count())
val mapRdd: RDD[(String, (String, String))] = dataFrame.rdd.map(_.getString(0))
.filter(_.split("\\:").length == 2)
.map(line => {
val Array(key1, value1) = line.split("\\:")
val Array(name, key2) = key1.replace("\"", "").trim.split("\\_")
val value2 = value1.replace("\"", "").replace(",", "").trim
(name, (key2, value2))
})
// mapRdd.collect().foreach(println)
val initVale = new ArrayBuffer[(String, String)]
val function1 = (buffer1: ArrayBuffer[(String, String)], t1: (String, String)) => buffer1.+=(t1)
val function2 = (buffer1: ArrayBuffer[(String, String)], buffer2: ArrayBuffer[(String, String)]) => buffer1.++(buffer2)
val aggRdd: RDD[(String, ArrayBuffer[(String, String)])] = mapRdd.aggregateByKey(initVale)(function1, function2)
// aggRdd.collect().foreach(println)
import scala.collection.JavaConverters._
val persons: util.List[Person] = aggRdd.map(line => {
val name = line._1
val keyValue = line._2(0)._2
val prosValue = line._2(1)._2
val consvalue = line._2(2)._2
Person(name, keyValue, prosValue, consvalue)
}).collect().toList.asJava
import com.google.gson.GsonBuilder
val gson = new GsonBuilder().create
val factors = Factors(persons)
val targetJsonStr = gson.toJson(factors)
println(targetJsonStr)
traget.json:
{
"factors": [
{
"name": "a",
"key": "1",
"pros": "2",
"cons": "3"
},
{
"name": "b",
"key": "4",
"pros": "5",
"cons": "6"
},
{
"name": "c",
"key": "7",
"pros": "8",
"cons": "9"
}
]
}
You can put the above code into the test method and run it to see the result you want.
@Test
def readAndSaveJsonTest: Unit = {}
Hope it can help you.
Upvotes: 0
Reputation: 1015
No need to involve dataframes for this, some simple string and dictionary manipulation will do:
import json
source = {
"a_key": "1",
"a_pro": "2",
"a_con": "3",
"b_key": "4",
"b_pro": "5",
"b_con": "6",
"c_key": "7",
"c_pro": "8",
"c_con": "9",
}
factors = {}
# Prepare each factor dictionary
for k, v in source.items():
factor, item = k.split('_')
d = factors.get(factor, {})
d[item] = v
factors[factor] = d
# Prepare result dictionary
target = {
'factors': []
}
# Move name attribute into dictionary & append
for k, v in factors.items():
d = v
d['name'] = k
target['factors'].append(d)
result = json.dumps(target)
Upvotes: 0
Reputation: 5526
IIUC first create the dataframe from the input json
json_data = {
"a_key": "1",
"a_pro": "2",
"a_con": "3",
"b_key": "4",
"b_pro": "5",
"b_con": "6",
"c_key": "7",
"c_pro": "8",
"c_con": "9"
}
df=spark.createDataFrame(list(map(list,json_data.items())),['key','value'])
df.show()
+-----+-----+
| key|value|
+-----+-----+
|a_key| 1|
|a_pro| 2|
|a_con| 3|
|b_key| 4|
|b_pro| 5|
|b_con| 6|
|c_key| 7|
|c_pro| 8|
|c_con| 9|
+-----+-----+
Now create the required columns from existing column
import pyspark.sql.functions as f
df2 = df.withColumn('Name', f.substring('key',1,1)).\
withColumn('Attributes', f.concat(f.split('key','_')[1],f.lit('s')))
df2.show()
+-----+-----+----+----------+
| key|value|Name|Attributes|
+-----+-----+----+----------+
|a_key| 1| a| keys|
|a_pro| 2| a| pros|
|a_con| 3| a| cons|
|b_key| 4| b| keys|
|b_pro| 5| b| pros|
|b_con| 6| b| cons|
|c_key| 7| c| keys|
|c_pro| 8| c| pros|
|c_con| 9| c| cons|
+-----+-----+----+----------+
Now pivot the dataframe and collect result as json object
output_json = df2.groupBy('Name').\
pivot('Attributes').\
agg(f.min('value')).\
select(f.collect_list(f.struct('Name','keys','cons','pros')).alias('factors')).\
toJSON().collect()
import json
print(json.dumps(json.loads(output_json[0]),indent=4))
{
"factors": [
{
"Name": "c",
"keys": "7",
"cons": "9",
"pros": "8"
},
{
"Name": "b",
"keys": "4",
"cons": "6",
"pros": "5"
},
{
"Name": "a",
"keys": "1",
"cons": "3",
"pros": "2"
}
]
}
Upvotes: 1