Reputation: 21
I'm using Zeppelin 0.6.2 and Spark 2.0.
I'm trying execute a query inside of a loop and it's not very effective.
I need to loop for each row of a dataframe, around 5000 rows and execute a query which will increment a value in another dataframe.
Here's my try at it:
val t2 = time
t2.registerTempTable("t2")
u.collect().foreach{ r =>
println(r(0))
val c=r(1)
val start="\""+r(2)+"\""
val end="\""+r(3)+"\""
sql("INSERT INTO TABLE t2 SELECT time, recordings + "+c+" AS recordings FROM time WHERE time >= " + start + " AND time < " + end)
}
I tried taking a tiny portion of the two dataframes but it is still really slow. I feel like I'm not doing this right.
Any idea how I can quickly update a dataframe?
Upvotes: 1
Views: 2578
Reputation: 74669
I need to loop for each row of a dataframe, around 5000 rows and execute a query which will increment a value in another dataframe.
I could spot u
, time
and t2
tables. t2
is to alias time
so you can later use it in INSERT
query. Right?
PROTIP: I'd be nice to have their schema.
Let's assume you've got a 5000-row DataFrame called df5k
:
// it's a fake 5k = a mere 5 rows for the sake of simplicity
// I think `u` is your 5k table (that you unnecessarily `collect` to `foreach`)
val u = Seq(
(0, 0, 0, 3),
(1, 3, 4, 5),
(2, 6, 6, 8),
(3, 9, 9, 17)).toDF("id", "c", "start", "end")
// I think `t2` is an alias for `time` and you want to update `t2`
val time = Seq(
(1, 10),
(4, 40),
(9, 90)).toDF("time", "recordings")
// this is the calculation of the new records
val new_t2 = u.join(time)
.where('time >= 'start)
.where('time < 'end)
.withColumn("recordings + c", 'recordings + 'c)
.select('time, $"recordings + c" as 'recordings)
// the following is an equivalent of INSERT INTO using Dataset API
val solution = time.union(new_t2)
Note: you are not updating DataFrame, but creating new DataFrame with new values.
Upvotes: 1