Mike
Mike

Reputation: 197

Merge large number of spark dataframes into one

I'm querying a cached hive temp table using different queries satisfying different conditions over more than 1500 times inside a for loop. I need to merge them all using unionAll inside the loop. But I get stackoverflow error due to the fact that spark cannot keep up with the RDD lineage.

pseudo code:

df=[from a hive table]
tableA=[from a hive table]
tableA.registerTempTable("tableA")
HiveContext.sql('CACHE TABLE tableA')

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)

This throws StackOverFlow error due to RDD lineage becoming hard. So I tried checkpointing as follows:

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)
    df.rdd.checkpoint
    df = sqlContext.createDataFrame(df.rdd, df.schema)

I got the same error. So I tried SaveAsTable which I always wanted to avoid because of lag in job submission between each hql queries and hive io inside a loop. But this approach worked well.

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df=query something from tableA
        df.write.saveAsTable('output', mode='append')
    elif ():
        df=query something from tableA
        df.write.saveAsTable('output', mode='append') 

I need help in avoiding saving the dataframe into hive inside the loop. I want to merge the dfs in some manner that's in-memory and efficient. One of the other options I tried is to insert the query result directly into a temp table for which I get a error: cannot insert into a RDD based table.

Upvotes: 1

Views: 680

Answers (1)

pasha701
pasha701

Reputation: 7207

maybe, temp table for result will work.

df1="query something from tableA".registerTempTable("result")
sqlContext.sql("Insert into result query something from tableA")

Upvotes: 0

Related Questions