wandermonk
wandermonk

Reputation: 7356

Reading multiline json using Spark Dataset API

I would like to perform join on two datasets using join() method. But I am unable to understand how the condition or the join column name needs to be specified.

public static void main(String[] args) {
        SparkSession spark = SparkSession
                  .builder()
                  .appName("Java Spark SQL basic example")
                  .master("spark://10.127.153.198:7077")
                  .getOrCreate();

        List<String> list = Arrays.asList("partyId");

        Dataset<Row> df = spark.read().text("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\alert.json");
        Dataset<Row> df2 = spark.read().text("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\contract.json");

        df.join(df2,JavaConversions.asScalaBuffer(list)).show();


//      df.join(df2, "partyId").show();

    }

When I execute the above code, I get this error

Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column `partyId` cannot be resolved on the left side of the join. The left-side columns: [value];
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1976)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1975)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:1975)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1961)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1958)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1958)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1957)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
    at org.apache.spark.sql.Dataset.join(Dataset.scala:775)
    at org.apache.spark.sql.Dataset.join(Dataset.scala:748)
    at com.cisco.cdx.batch.JsonDataReader.main(JsonDataReader.java:27)

Both the JSON's have the column "partyId". Please help.

Data:

Both the JSON's have the column "partyId". But, when I am joining both datasets spark is unable to find the column. Is there anything I am missing here?

Alerts.json

{
    "sourcePartyId": "SmartAccount_700001",
    "sourceSubPartyId": "",
    "partyId": "700001",
    "managedndjn": "BIZ_KEY_999001",
    "neAlert": {
        "data1": [{
            "sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
                }],
        "daa2": [{
            "sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
        }],
        "data3": [{
            "sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
            "ndjn": "999001",
        }],
        "advisory": [{
            "sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
            "ndjn": "999001",
        }]
    }
}

Contracts.json

{

  "sourceSubPartyId": "",
  "partyId": "700001",
  "neContract": {
    "serialNumber": "FCH2013V245",
    "productId": "FS4000-K9",
    "coverageInfo": [
      {
        "billToCity": "Delhi",
        "billToCountry": "India",
        "billToPostalCode": "260001",
        "billToProvince": "",
        "slaCode": "1234",
      }
    ]
  }
}

But, when i am reading the below way i am able to print the data.

JavaRDD<Tuple2<String, String>> javaRDD = spark.sparkContext().wholeTextFiles("C:\\\\Users\\\\phyadavi\\\\LearningAndDevelopment\\\\Spark-Demo\\\\data1\\\\alert.json", 1).toJavaRDD();
List<Tuple2<String, String>> collect = javaRDD.collect();
        collect.forEach(x -> {
            System.out.println(x._1);
            System.out.println(x._2);
        });

Upvotes: 0

Views: 2473

Answers (2)

wandermonk
wandermonk

Reputation: 7356

The issue resolved after making the JSON single lined. Therefore i would like to post my answer.

public class JsonDataReader {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example")
                .master("spark://192.168.0.2:7077").getOrCreate();

//      JavaRDD<Tuple2<String, String>> javaRDD = spark.sparkContext().wholeTextFiles("C:\\\\Users\\\\phyadavi\\\\LearningAndDevelopment\\\\Spark-Demo\\\\data1\\\\alert.json", 1).toJavaRDD();

        Seq<String> joinColumns = scala.collection.JavaConversions
                  .asScalaBuffer(Arrays.asList("partyId","sourcePartyId", "sourceSubPartyId", "wfid", "generatedAt", "collectorId"));

        Dataset<Row> df = spark.read().option("multiLine",true).option("mode", "PERMISSIVE")
                .json("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\alert.json");
        Dataset<Row> df2 = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                .json("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\contract.json");

        Dataset<Row> finalDS = df.join(df2, joinColumns,"inner");
        finalDS.write().mode(SaveMode.Overwrite).json("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\final.json");

//      List<Tuple2<String, String>> collect = javaRDD.collect();
//      collect.forEach(x -> {
//          System.out.println(x._1);
//          System.out.println(x._2);
//      });

    }

}

However, @ShankarKoiralas answer was more precise and worked for me. Hence, accepted the answer.

Upvotes: 0

koiralo
koiralo

Reputation: 23109

The problem is you are trying to read as a text file with spark.read().text()

If you want to read a json file directly to dataframe you need to use

spark.read().json()

If the data is multilined then you need to add option as

spark.read.option("multiline", "true").json()

That's why you are not able to access the columns in join

Another way is to read as text file and convert it to JSON

val jsonRDD = sc.wholeTextFiles("path to json").map(x => x._2)

spark.sqlContext.read.json(jsonRDD)
    .show(false)

Upvotes: 1

Related Questions