v00d00
v00d00

Reputation: 3265

Reactive Extensions buffer events until requested

There is a demo app I prepared.

using System.Collections.Concurrent;
using System.Reactive.Linq;

class Program
{
    static void Main(string[] args)
    {
        var stored = new ConcurrentQueue<long>();

        Observable.Interval(TimeSpan.FromMilliseconds(20))
            .Subscribe(it => stored.Enqueue(it));

        var random = new Random();

        Task.Run(async () =>
        {
            while (true)
            {
                await Task.Delay((int)(random.NextDouble() * 1000));
                var currBatch = stored.ToArray();
                for (int i = 0; i < currBatch.Length; i++)
                {
                    long res;
                    stored.TryDequeue(out res);
                }
                Console.WriteLine("[" + string.Join(",", currBatch) + "]");
            }
        });

        Console.ReadLine();
    }
}

It simulates independent consumer, which fires at random time intervals. In real app event source would come from file system, though might be bursty.

What this thing does is storing indefinite ammount of events in concurrent queue, until consumer decides to consume gathered events.

I have a strong feeling that this code is unsafe. Is it possible to reproduce such behaviour in purely Rx manner?

If not, can you suggest better / safer approach?

Upvotes: 1

Views: 207

Answers (1)

Andrei Tătar
Andrei Tătar

Reputation: 8295

Here you go:

var producer = Observable.Interval(TimeSpan.FromMilliseconds(20));
var random = new Random();

Task.Run(async () =>
{
    var notify = new Subject<int>();
    producer.Window(() => notify)
        .SelectMany(ev => ev.ToList())
        .Subscribe(currBatch => Console.WriteLine("[" + string.Join(",", currBatch) + "]"));

    while (true)
    {
        await Task.Delay((int)(random.NextDouble() * 1000));
        notify.OnNext(1);
    }
});

Console.ReadLine();

Upvotes: 2

Related Questions