Reputation: 25
I have two RDDs in SCALA and converted those to dataframes.
Now I have two dataframes.One prodUniqueDF
where I have two columns named prodid
and uid
, it is having master data of product
scala> prodUniqueDF.printSchema
root
|-- prodid: string (nullable = true)
|-- uid: long (nullable = false)
Second, ratingsDF
where I have columns named prodid
,custid
,ratings
scala> ratingsDF.printSchema
root
|-- prodid: string (nullable = true)
|-- custid: string (nullable = true)
|-- ratings: integer (nullable = false)
I want to join the above two and replace the ratingsDF.prodid
with prodUniqueDF.uid
in the ratingsDF
To do this, I first registered them as 'tempTables'
prodUniqueDF.registerTempTable("prodUniqueDF")
ratingsDF.registerTempTable("ratingsDF")
And I run the code
val testSql = sql("SELECT prodUniqueDF.uid, ratingsDF.custid, ratingsDF.ratings FROM prodUniqueDF, ratingsDF WHERE prodUniqueDF.prodid = ratingsDF.prodid")
But the error comes as :
org.apache.spark.sql.AnalysisException: Table not found: prodUniqueDF; line 1 pos 66
Please help! How can I achieve the join? Is there another method to map RDDs instead?
Upvotes: 1
Views: 1203
Reputation: 3544
The Joining of the DataFrames can easily be achieved, Format is
DataFrameA.join(DataFrameB)
By default it takes an inner join, but you can also specify the type of join that you want to do and they have APi's for that You can look here for more information.
http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.DataFrame
For replacing the values in an existing column you can take help of withColumn method from the API
It would be something like this:
val newDF = dfA.withColumn("newColumnName", dfB("columnName"))).drop("columnName").withColumnRenamed("newColumnName", "columnName")
I think this might do the trick !
Upvotes: 1