Reputation: 13
I want to accept TV data and do some operations
First I create a Class as datatype
class Customer(val customerID: Int, val TimeStamp: Int, val Channel: Int)
when the streaming receive data, I turned it to RDD[k,v] type, k is customerID+TimeStamp,and then vlue is the Customer object
The first question is how to use mapWithState to collect all data ? (so that I can do some operation like statistics)
and the seconed is if I have a function
rateLookup(Channel, TimeStamp)
to list all the data with the same Channel and TimeStamp
how to let user trigger this function while the streaming is running?
Could the streaming detect user input and then excute the function?
Upvotes: 1
Views: 535
Reputation: 149538
The first question is how to use mapWithState to collect all data?
You need to use StateSpec.function
to create the matching function for mapWithState
:
def analyzeCustomer(customerId: Int,
data: Option[Seq[Customer]],
state: State[Customer]): Option[Customer] = {
// Do stuff
}
Where the State[T]
is the state of the data you'll be keeping in memory, data is what you'll be receiving from the DStream
. Now call it like this:
val spec = StateSpec.function(analyzeCustomer _)
rdd.mapWithState(spec)
how to let user trigger this function while the streaming is running?
There are many ways you can do this. One would be to output the state into a persistent data store and then have the user interface operate from the persistent storage. This can be done with foreachRDD
:
rdd.mapWithState(spec)
.foreachRDD(rdd => {
// Output state to persistent storage.
})
Upvotes: 1