Akhil Agarwal
Akhil Agarwal

Reputation: 41

How to iterate over DataStream

I am a newbie to scala. I have a custom class Analytics.scala which has few variables (var a, var b, var c). I get a DataStream of type Analytics in my test case and I want to set value of var c as '0' for every object.

I've tried using map function over DataStream but it didn't help. I also tried converting stream to list and then iterating over that list but that didn't work either.

stream is of type DataStream[Analytics]. This is what I have tried:

stream.map(x => x.c=0)
val a = DataStreamUtils.collect(stream.javaStream).asScala.toArray.iterator
a.foreach(x => x.c=0)

value of var c doesn't change to 0 in my test case.

Upvotes: 2

Views: 1111

Answers (2)

David Anderson
David Anderson

Reputation: 43717

In general, a Flink DataStream isn't a finite collection you can iterate over once and be done -- it's a potentially unbounded stream that just keeps having more data.

Using a map is the right way to go. But when you apply a map to a stream, as in

stream.map(x => x.c=0)

you are describing a stream transformation, and not modifying the stream itself. You should instead try

streamWhereCisZero = stream.map(x => x.c=0)

This creates a new stream where every element will have c set to zero.

Upvotes: 3

Akhil Agarwal
Akhil Agarwal

Reputation: 41

This is how I iterated. Not sure if this is the best solution.

val collection = DataStreamUtils.collect(stream.javaStream)
val results: Seq[Analytics] = collection.asScala.toSeq
for (result <- results){
    result.c=0
}

Upvotes: 0

Related Questions