Reputation: 7366
I would like to know if there is any possibility to create a customized JSON using Spark Dataset API
or any other function provided by Apache Spark. I am aware that i can join two Dataset's using the join()
method but i want to create custom JSON where the Dataset 2 i.e alerts in my case to be added to Dataset 1 i.e inventory as a JSON object with key "ALERT".
Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\inventory.json");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\alert.json");
Dataset<Row> inventoryAlerts = inventory.join(alerts);
inventoryAlerts.printSchema();
The schema of inventory and alerts are as below.
root
|-- Equipment: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- collectedPid: string (nullable = true)
| | | |-- collectedSerialNum: string (nullable = true)
| | | |-- containingHwId: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- items: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- tagName: string (nullable = true)
| | | | | |-- tagValue: string (nullable = true)
| | | |-- pceMultiPid: string (nullable = true)
| | | |-- pcePhyiscalType: string (nullable = true)
| | | |-- pcePid: string (nullable = true)
| | | |-- pceProductDescription: string (nullable = true)
| | | |-- pceProductFamily: string (nullable = true)
| | | |-- pceProductType: string (nullable = true)
| | | |-- pceRuleId: string (nullable = true)
| | | |-- productDescription: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- productType: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- snasItemType: string (nullable = true)
| | | |-- snasProductFamily: string (nullable = true)
| | | |-- snasSerialNumber: string (nullable = true)
| | | |-- snasValidationCode: string (nullable = true)
| | | |-- snasValidationSource: string (nullable = true)
|-- LicenseActivated: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- count: long (nullable = true)
| | | |-- type: string (nullable = true)
|-- NetworkElement: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- hostname: string (nullable = true)
| | | |-- ipAddress: string (nullable = true)
| | | |-- isManagedNe: boolean (nullable = true)
| | | |-- items: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- tagName: string (nullable = true)
| | | | | |-- tagValue: string (nullable = true)
| | | |-- lastUpdateDate: long (nullable = true)
| | | |-- managedNeId: string (nullable = true)
| | | |-- managementAddress: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- neName: string (nullable = true)
| | | |-- neRegistrationStatus: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- productType: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- smartLicenseProductInstanceIdentifier: string (nullable = true)
| | | |-- smartLicenseVirtualAccountName: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
| | | |-- systemUptime: long (nullable = true)
| | | |-- udiProductIdentifier: string (nullable = true)
|-- Versions: struct (nullable = true)
| |-- items: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- lastUpdated: long (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- version: string (nullable = true)
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)
#####################################
root
|-- collectorId: string (nullable = true)
|-- generatedAt: long (nullable = true)
|-- managedNeId: string (nullable = true)
|-- neAlert: struct (nullable = true)
| |-- advisory: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- headlineName: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- psirtId: long (nullable = true)
| | | |-- publicReleaseInd: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
| | | |-- vulnerabilityReason: string (nullable = true)
| | | |-- vulnerabilityStatus: string (nullable = true)
| |-- fieldNotice: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- caveat: string (nullable = true)
| | | |-- distributionCode: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- fieldNoticeId: long (nullable = true)
| | | |-- fieldNoticeName: string (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productFamily: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- serialNumber: string (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- vulnerabilityReason: string (nullable = true)
| | | |-- vulnerabilityStatus: string (nullable = true)
| |-- hwEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinName: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- hardwareEoXId: long (nullable = true)
| | | |-- hwId: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productId: string (nullable = true)
| |-- swEoX: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bulletinHeadline: string (nullable = true)
| | | |-- equipmentType: string (nullable = true)
| | | |-- neId: string (nullable = true)
| | | |-- productId: string (nullable = true)
| | | |-- softwareEoXId: long (nullable = true)
| | | |-- softwareType: string (nullable = true)
| | | |-- softwareVersion: string (nullable = true)
|-- partyId: string (nullable = true)
|-- recordType: string (nullable = true)
|-- sourceNeId: string (nullable = true)
|-- sourcePartyId: string (nullable = true)
|-- sourceSubPartyId: string (nullable = true)
|-- wfid: string (nullable = true)
Upvotes: 0
Views: 1264
Reputation: 23119
If you want to join and keep the fields from one dataset as a nested, you can use struct
to create a StructType column and join as below
import org.apache.spark.sql.functions.udf
Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("path to json inventory");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("path to alerts json")
.select($"partyId", struct("columns").as("ALERTS"));
//column names are all the columns that you want in nested fiels with comma separated
Dataset<Row> inventoryAlerts = inventory.join(alerts);
inventoryAlerts.printSchema();
This should give you the required schema
after the join
.
Upvotes: 1