Reputation: 41
I am a newbie to scala. I have a custom class Analytics.scala which has few variables (var a, var b, var c). I get a DataStream of type Analytics in my test case and I want to set value of var c as '0' for every object.
I've tried using map function over DataStream but it didn't help. I also tried converting stream to list and then iterating over that list but that didn't work either.
stream is of type DataStream[Analytics]. This is what I have tried:
stream.map(x => x.c=0)
val a = DataStreamUtils.collect(stream.javaStream).asScala.toArray.iterator
a.foreach(x => x.c=0)
value of var c doesn't change to 0 in my test case.
Upvotes: 2
Views: 1111
Reputation: 43717
In general, a Flink DataStream isn't a finite collection you can iterate over once and be done -- it's a potentially unbounded stream that just keeps having more data.
Using a map is the right way to go. But when you apply a map to a stream, as in
stream.map(x => x.c=0)
you are describing a stream transformation, and not modifying the stream itself. You should instead try
streamWhereCisZero = stream.map(x => x.c=0)
This creates a new stream where every element will have c set to zero.
Upvotes: 3
Reputation: 41
This is how I iterated. Not sure if this is the best solution.
val collection = DataStreamUtils.collect(stream.javaStream)
val results: Seq[Analytics] = collection.asScala.toSeq
for (result <- results){
result.c=0
}
Upvotes: 0