wandermonk
wandermonk

Reputation: 7366

How to add JSON Object to a Dataset/Dataframe in apache spark

I would like to know if there is any possibility to create a customized JSON using Spark Dataset API

or any other function provided by Apache Spark. I am aware that i can join two Dataset's using the join() method but i want to create custom JSON where the Dataset 2 i.e alerts in my case to be added to Dataset 1 i.e inventory as a JSON object with key "ALERT".

Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                    .json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\inventory.json");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                    .json("C:\\Users\\phyadavi\\LearningAndDevelopment\\\\CDXJSONMergeJob\\data1\\alert.json");

Dataset<Row> inventoryAlerts = inventory.join(alerts);
        inventoryAlerts.printSchema();

The schema of inventory and alerts are as below.

root
 |-- Equipment: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- collectedPid: string (nullable = true)
 |    |    |    |-- collectedSerialNum: string (nullable = true)
 |    |    |    |-- containingHwId: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- tagName: string (nullable = true)
 |    |    |    |    |    |-- tagValue: string (nullable = true)
 |    |    |    |-- pceMultiPid: string (nullable = true)
 |    |    |    |-- pcePhyiscalType: string (nullable = true)
 |    |    |    |-- pcePid: string (nullable = true)
 |    |    |    |-- pceProductDescription: string (nullable = true)
 |    |    |    |-- pceProductFamily: string (nullable = true)
 |    |    |    |-- pceProductType: string (nullable = true)
 |    |    |    |-- pceRuleId: string (nullable = true)
 |    |    |    |-- productDescription: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- productType: string (nullable = true)
 |    |    |    |-- serialNumber: string (nullable = true)
 |    |    |    |-- snasItemType: string (nullable = true)
 |    |    |    |-- snasProductFamily: string (nullable = true)
 |    |    |    |-- snasSerialNumber: string (nullable = true)
 |    |    |    |-- snasValidationCode: string (nullable = true)
 |    |    |    |-- snasValidationSource: string (nullable = true)
 |-- LicenseActivated: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- count: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |-- NetworkElement: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- hostname: string (nullable = true)
 |    |    |    |-- ipAddress: string (nullable = true)
 |    |    |    |-- isManagedNe: boolean (nullable = true)
 |    |    |    |-- items: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- tagName: string (nullable = true)
 |    |    |    |    |    |-- tagValue: string (nullable = true)
 |    |    |    |-- lastUpdateDate: long (nullable = true)
 |    |    |    |-- managedNeId: string (nullable = true)
 |    |    |    |-- managementAddress: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- neName: string (nullable = true)
 |    |    |    |-- neRegistrationStatus: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- productType: string (nullable = true)
 |    |    |    |-- serialNumber: string (nullable = true)
 |    |    |    |-- smartLicenseProductInstanceIdentifier: string (nullable = true)
 |    |    |    |-- smartLicenseVirtualAccountName: string (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- softwareVersion: string (nullable = true)
 |    |    |    |-- systemUptime: long (nullable = true)
 |    |    |    |-- udiProductIdentifier: string (nullable = true)
 |-- Versions: struct (nullable = true)
 |    |-- items: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- lastUpdated: long (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)

#####################################
root
 |-- collectorId: string (nullable = true)
 |-- generatedAt: long (nullable = true)
 |-- managedNeId: string (nullable = true)
 |-- neAlert: struct (nullable = true)
 |    |-- advisory: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- headlineName: string (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- psirtId: long (nullable = true)
 |    |    |    |-- publicReleaseInd: string (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- softwareVersion: string (nullable = true)
 |    |    |    |-- vulnerabilityReason: string (nullable = true)
 |    |    |    |-- vulnerabilityStatus: string (nullable = true)
 |    |-- fieldNotice: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- caveat: string (nullable = true)
 |    |    |    |-- distributionCode: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- fieldNoticeId: long (nullable = true)
 |    |    |    |-- fieldNoticeName: string (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productFamily: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- serialNumber: string (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- vulnerabilityReason: string (nullable = true)
 |    |    |    |-- vulnerabilityStatus: string (nullable = true)
 |    |-- hwEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinName: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- hardwareEoXId: long (nullable = true)
 |    |    |    |-- hwId: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |-- swEoX: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- bulletinHeadline: string (nullable = true)
 |    |    |    |-- equipmentType: string (nullable = true)
 |    |    |    |-- neId: string (nullable = true)
 |    |    |    |-- productId: string (nullable = true)
 |    |    |    |-- softwareEoXId: long (nullable = true)
 |    |    |    |-- softwareType: string (nullable = true)
 |    |    |    |-- softwareVersion: string (nullable = true)
 |-- partyId: string (nullable = true)
 |-- recordType: string (nullable = true)
 |-- sourceNeId: string (nullable = true)
 |-- sourcePartyId: string (nullable = true)
 |-- sourceSubPartyId: string (nullable = true)
 |-- wfid: string (nullable = true)

Upvotes: 0

Views: 1264

Answers (1)

koiralo
koiralo

Reputation: 23119

If you want to join and keep the fields from one dataset as a nested, you can use struct to create a StructType column and join as below

import org.apache.spark.sql.functions.udf

Dataset<Row> inventory = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                    .json("path to json inventory");
Dataset<Row> alerts = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                           .json("path to alerts json")
                           .select($"partyId", struct("columns").as("ALERTS"));
//column names are all the columns that you want in nested fiels with comma separated

Dataset<Row> inventoryAlerts = inventory.join(alerts);
        inventoryAlerts.printSchema();

This should give you the required schema after the join.

Upvotes: 1

Related Questions