Reputation: 21
I have a broadcast value got from database. I define the broadcast on the driver master
val stampsBroadcast = ssc.sparkContext.broadcast(stampListMap)
This value (stampsBroadcast.value
) was used on the worker node executors. Once the executor finish the task (add new key to the database). I need update the broadcast value to add this new key.
I tried to use:
stampsBroadcast.unpersist(false)
ssc.sparkContext.broadcast(NewstampsBroadcastValue)
but it seems I can't use ssc
on the worker nodes. If I re-broadcast on the driver master, how can I get the new data from the worker nodes?
Upvotes: 2
Views: 1362
Reputation: 1
First: Update the spark to spark2.3
Second: Make the stampListMap as file stream (sparkStream.readStream.textFile('/../my.txt')...load()
), the file stream can self update the content itself. And if now you use stream join static,you can use stream join stream in spark2.3+.
Upvotes: 0
Reputation: 1584
You cannot create a broadcast variable from the worker nodes.
In your case, basically you need Accumulators. Define the accumulator on the Driver. On the worker nodes you can update the accumulator value. Again you can fetch the updated value on the Driver.
Note:you cannot retrieve the value of accumulator on the worker nodes. Only updated of value is possible from worker nodes.
Below an example from spark docs:
// creating the accumulator on driver
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
// updating the accumulator on worker nodes
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
// fetaching the value
scala> accum.value
res2: Long = 10
Upvotes: 2