user1189851
user1189851

Reputation: 5041

fetch data from hive table into spark and perform join on RDDs

I have two tables in hive/impala. I want to fetch the data from the table into spark as rdds and perform say a join operation.

I do not want to directly pass the join query in my hive context. This is just an example. I have more use cases that are not possible by a standard HiveQL. How do I fetch all rows, access the columns and perform transformation.

Suppose I have two rdds:

val table1 =  hiveContext.hql("select * from tem1")

val table2 =  hiveContext.hql("select * from tem2")

I want to perform a join on the rdds on a column called "account_id"

Ideally I want to do something like this using the rdds using spark shell.

select * from tem1 join tem2 on tem1.account_id=tem2.account_id; 

Upvotes: 3

Views: 16246

Answers (4)

BadBoy777
BadBoy777

Reputation: 155

You can directly select that column which you want from following code:

val table1 =  hiveContext.hql("select account_id from tem1")
val table2 =  hiveContext.hql("select account_id from tem2")
val joinedTable = table1.join(table2) 

Upvotes: 0

Daniel de Paula
Daniel de Paula

Reputation: 17872

I'm not sure I understood the question, but as an alternative you can use the API to join DataFrames, so you can have many things decided programatically (e.g. the join function can be passed as parameter to a method that applies a custom transformation).

For your example, it would be like this:

val table1 =  hiveContext.sql("select * from tem1")
val table2 =  hiveContext.sql("select * from tem2")
val common_attributes = Seq("account_id")
val joined = table1.join(table2, common_attributes)

There are many common transformations available in the DataFrame API: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

Cheers

Upvotes: 1

Blaubaer
Blaubaer

Reputation: 664

table1 and table2 are of type DataFrame. It is possible to transform them into rdd's using:

lazy val table1_rdd = table1.rdd
lazy val table2_rdd = table2.rdd

This should to the trick. On these rdd's you can use whatever rdd operation.

See also: https://issues.apache.org/jira/browse/SPARK-6608 and https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

Upvotes: 0

Holden
Holden

Reputation: 7452

So we could register table1 and table2 as temporary tables and then do the join on these temporary tables.

table1.registerTempTable("t1")
table2.registerTempTable("t2")
table3 = hiveContext.hql("select * from t1 join t2 on t1.account_id=t2.account_id")

Upvotes: 1

Related Questions