Reputation: 10890
I have an actor which receives WeatherConditions
and pushes it (by using OfferAsync
) it to source
. Currently it is setup to run for each item it receives (it stores it to db).
public class StoreConditionsActor : ReceiveActor
{
public StoreConditionsActor(ITemperatureDataProvider temperatureDataProvider)
{
var materializer = Context.Materializer();
var source = Source.Queue<WeatherConditions>(10, OverflowStrategy.DropTail);
var graph = source
.To(Sink.ForEach<WeatherConditions>(conditions => temperatureDataProvider.Store(conditions)))
.Run(materializer);
Receive<WeatherConditions>(i =>
{
graph.OfferAsync(i);
});
}
}
What I would like to achieve is:
WeatherConditions
from all items received in this N minutes time windowI've been trying ConflateWithSeed
, Buffer
, Throttle
but neither seems to be working (I'm newbie in Akka / Akka Streams so I may be missing something basic)
Upvotes: 0
Views: 365
Reputation: 19507
This answer uses Akka Streams and Scala, but perhaps it will inspire your Akka.NET solution.
The groupedWithin
method could meet your first requirement:
val queue =
Source.queue[Int](10, OverflowStrategy.dropTail)
.groupedWithin(10, 1 second)
.map(group => group.sum / group.size)
.toMat(Sink.foreach(println))(Keep.left)
.run()
Source(1 to 10000)
.throttle(10, 1 second)
.mapAsync(1)(queue.offer(_))
.runWith(Sink.ignore)
In the above example, up to 10 integers per second are offered to the SourceQueue
, which groups the incoming elements in one-second bundles and calculates the respective averages of each bundle.
As for your second requirement, you could use sliding
to compare an element with the previous element. The below example passes an element downstream only if it is at least 30% greater than the previous element:
val source: Source[Int, _] = ???
source
.sliding(2, 1)
.collect {
case Seq(a, b) if b >= 1.3 * a => b
}
.runForeach(println)
Upvotes: 3