Sam
Sam

Reputation: 21

How to update SPARK broadcast value on worker node?

I have a broadcast value got from database. I define the broadcast on the driver master

val stampsBroadcast = ssc.sparkContext.broadcast(stampListMap)

This value (stampsBroadcast.value) was used on the worker node executors. Once the executor finish the task (add new key to the database). I need update the broadcast value to add this new key.

I tried to use:

stampsBroadcast.unpersist(false)
ssc.sparkContext.broadcast(NewstampsBroadcastValue)

but it seems I can't use ssc on the worker nodes. If I re-broadcast on the driver master, how can I get the new data from the worker nodes?

Upvotes: 2

Views: 1362

Answers (2)

Tommy Duan
Tommy Duan

Reputation: 1

First: Update the spark to spark2.3

Second: Make the stampListMap as file stream (sparkStream.readStream.textFile('/../my.txt')...load()), the file stream can self update the content itself. And if now you use stream join static,you can use stream join stream in spark2.3+.

Upvotes: 0

Amit Kumar
Amit Kumar

Reputation: 1584

You cannot create a broadcast variable from the worker nodes.

In your case, basically you need Accumulators. Define the accumulator on the Driver. On the worker nodes you can update the accumulator value. Again you can fetch the updated value on the Driver.

Note:you cannot retrieve the value of accumulator on the worker nodes. Only updated of value is possible from worker nodes.

Below an example from spark docs:

// creating the accumulator on driver
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

// updating the accumulator on worker nodes
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

// fetaching the value
scala> accum.value
res2: Long = 10

Upvotes: 2

Related Questions