Reputation: 27677
I have an ordered stream of data
A A A B B C C C C ... (very long)
And I want to transform it to a stream of aggregates in the form (item, count):
(A, 3) (B, 2) (C, 4)
What operators could I use in Akka Streams for this?
Source.fromPublisher(publisher)
.aggregateSomehow() // ?
.runWith(sink)
I've looked into .groupBy but it requires that I know the number of categories in advance which I don't. Also I believe it will keep all groups in memory which I'd like to avoid. I should be able to discard (A, 3) after it has been processed and free up resources it consumes.
Edit: This question ask for similar functionality but using SubFlows. However using SubFlows doesn't seem to be required because I have a solution using the statefulMapConcat
combinator.
Upvotes: 2
Views: 461
Reputation: 27677
One option is to use the statefulMapConcat combinator:
Source(List("A", "A", "B", "B", "B", "C", "C", ""))
.statefulMapConcat({ () =>
var lastChar = ""
var count = 0
char => if(lastChar == char) {
count += 1
List.empty
} else {
val charCount = (lastChar, count)
lastChar = char
count = 1
List(charCount)
}
})
.runForeach(println)
However that required appending an element to the input stream to mark the end.
Output:
(,0)
(A,2)
(B,3)
(C,2)
Thanks to @chunjef for the suggestion in comments
Upvotes: 1