Reputation: 197
I'm querying a cached hive temp table using different queries satisfying different conditions over more than 1500 times inside a for loop. I need to merge them all using unionAll inside the loop. But I get stackoverflow error due to the fact that spark cannot keep up with the RDD lineage.
pseudo code:
df=[from a hive table]
tableA=[from a hive table]
tableA.registerTempTable("tableA")
HiveContext.sql('CACHE TABLE tableA')
for i in range(0,2000):
if (list[0]['column1']=='xyz'):
df1=query something from tableA
df=df.unionAll(df1)
elif ():
df1=query something from tableA
df=df.unionAll(df1)
elif ():
df1=query something from tableA
df=df.unionAll(df1)
elif ():
df1=query something from tableA
df=df.unionAll(df1)
else:
df1=query something from tableA
df=df.unionAll(df1)
This throws StackOverFlow error due to RDD lineage becoming hard. So I tried checkpointing as follows:
for i in range(0,2000):
if (list[0]['column1']=='xyz'):
df1=query something from tableA
df=df.unionAll(df1)
elif ():
df1=query something from tableA
df=df.unionAll(df1)
else:
df1=query something from tableA
df=df.unionAll(df1)
df.rdd.checkpoint
df = sqlContext.createDataFrame(df.rdd, df.schema)
I got the same error. So I tried SaveAsTable which I always wanted to avoid because of lag in job submission between each hql queries and hive io inside a loop. But this approach worked well.
for i in range(0,2000):
if (list[0]['column1']=='xyz'):
df=query something from tableA
df.write.saveAsTable('output', mode='append')
elif ():
df=query something from tableA
df.write.saveAsTable('output', mode='append')
I need help in avoiding saving the dataframe into hive inside the loop. I want to merge the dfs in some manner that's in-memory and efficient. One of the other options I tried is to insert the query result directly into a temp table for which I get a error: cannot insert into a RDD based table.
Upvotes: 1
Views: 680
Reputation: 7207
maybe, temp table for result will work.
df1="query something from tableA".registerTempTable("result")
sqlContext.sql("Insert into result query something from tableA")
Upvotes: 0