Reputation: 7356
I would like to perform join on two datasets using join()
method. But I am unable to understand how the condition or the join column name needs to be specified.
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.master("spark://10.127.153.198:7077")
.getOrCreate();
List<String> list = Arrays.asList("partyId");
Dataset<Row> df = spark.read().text("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\alert.json");
Dataset<Row> df2 = spark.read().text("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\contract.json");
df.join(df2,JavaConversions.asScalaBuffer(list)).show();
// df.join(df2, "partyId").show();
}
When I execute the above code, I get this error
Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column `partyId` cannot be resolved on the left side of the join. The left-side columns: [value];
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1976)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1975)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:1975)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1961)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1957)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
at org.apache.spark.sql.Dataset.join(Dataset.scala:775)
at org.apache.spark.sql.Dataset.join(Dataset.scala:748)
at com.cisco.cdx.batch.JsonDataReader.main(JsonDataReader.java:27)
Both the JSON's have the column "partyId". Please help.
Data:
Both the JSON's have the column "partyId". But, when I am joining both datasets spark is unable to find the column. Is there anything I am missing here?
Alerts.json
{
"sourcePartyId": "SmartAccount_700001",
"sourceSubPartyId": "",
"partyId": "700001",
"managedndjn": "BIZ_KEY_999001",
"neAlert": {
"data1": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
}],
"daa2": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
}],
"data3": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
"ndjn": "999001",
}],
"advisory": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
"ndjn": "999001",
}]
}
}
Contracts.json
{
"sourceSubPartyId": "",
"partyId": "700001",
"neContract": {
"serialNumber": "FCH2013V245",
"productId": "FS4000-K9",
"coverageInfo": [
{
"billToCity": "Delhi",
"billToCountry": "India",
"billToPostalCode": "260001",
"billToProvince": "",
"slaCode": "1234",
}
]
}
}
But, when i am reading the below way i am able to print the data.
JavaRDD<Tuple2<String, String>> javaRDD = spark.sparkContext().wholeTextFiles("C:\\\\Users\\\\phyadavi\\\\LearningAndDevelopment\\\\Spark-Demo\\\\data1\\\\alert.json", 1).toJavaRDD();
List<Tuple2<String, String>> collect = javaRDD.collect();
collect.forEach(x -> {
System.out.println(x._1);
System.out.println(x._2);
});
Upvotes: 0
Views: 2473
Reputation: 7356
The issue resolved after making the JSON single lined. Therefore i would like to post my answer.
public class JsonDataReader {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example")
.master("spark://192.168.0.2:7077").getOrCreate();
// JavaRDD<Tuple2<String, String>> javaRDD = spark.sparkContext().wholeTextFiles("C:\\\\Users\\\\phyadavi\\\\LearningAndDevelopment\\\\Spark-Demo\\\\data1\\\\alert.json", 1).toJavaRDD();
Seq<String> joinColumns = scala.collection.JavaConversions
.asScalaBuffer(Arrays.asList("partyId","sourcePartyId", "sourceSubPartyId", "wfid", "generatedAt", "collectorId"));
Dataset<Row> df = spark.read().option("multiLine",true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\alert.json");
Dataset<Row> df2 = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
.json("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\contract.json");
Dataset<Row> finalDS = df.join(df2, joinColumns,"inner");
finalDS.write().mode(SaveMode.Overwrite).json("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\final.json");
// List<Tuple2<String, String>> collect = javaRDD.collect();
// collect.forEach(x -> {
// System.out.println(x._1);
// System.out.println(x._2);
// });
}
}
However, @ShankarKoiralas answer was more precise and worked for me. Hence, accepted the answer.
Upvotes: 0
Reputation: 23109
The problem is you are trying to read as a text file with spark.read().text()
If you want to read a json
file directly to dataframe you need to use
spark.read().json()
If the data is multilined then you need to add option as
spark.read.option("multiline", "true").json()
That's why you are not able to access the columns in join
Another way is to read as text file and convert it to JSON
val jsonRDD = sc.wholeTextFiles("path to json").map(x => x._2)
spark.sqlContext.read.json(jsonRDD)
.show(false)
Upvotes: 1