Thomas Levesque
Thomas Levesque

Reputation: 292735

In Rx, how to group latest items after a period of time?

Sorry if the title isn't very clear, I couldn't think of anything better...

I'm receiving user input in the form of an IObservable<char>, and I'd like to transform it to an IObservable<char[]>, by grouping the chars every time the user stops typing for more than 1 second. So, for instance, if the input is as follows:

h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)

I'd like the output observable to be:

['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']

I suspect the solution is fairly simple, but I can't find the right approach... I tried to use Buffer, GroupByUntil, Throttle and a few others, to no avail.

Any idea would be welcome!


EDIT: I've got something that almost works:

        _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

But I need the delay to be reset every time a new character is typed...

Upvotes: 8

Views: 1213

Answers (4)

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

Buffer and Throttle would be enough, if your source is hot. To make it hot, you can use .Publish().RefCount() to ensure you only end up with one subscription to the source.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
                                              TimeSpan dueTime)
{
    if (source == null) throw new ArgumentNullException("source");
    //defer dueTime checking to Throttle
    var hot = source.Publish().RefCount();
    return hot.Buffer(() => hot.Throttle(dueTime));
}

Upvotes: 7

Tim S.
Tim S.

Reputation: 56586

This ought to work. It's not nearly as concise as your solution, as it implements the logic through a class instead of extension methods, but it might be a better way to do it. In short: every time you get a char, add it to a List and (re)start a timer that will expire in one second; when the timer expires, notify our subscribers with the List as an array and reset the state so it's ready for next time.

    class Breaker : IObservable<char[]>, IObserver<char>
    {
        List<IObserver<char[]>> observers = new List<IObserver<char[]>>();
        List<char> currentChars;
        DispatcherTimer t;
        public Breaker(IObservable<char> source)
        {
            source.Subscribe(this);
            t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) };
            t.Tick += TimerOver;
            currentChars = new List<char>();
        }
        public IDisposable Subscribe(IObserver<char[]> observer)
        {
            observers.Add(observer);
            return null; //TODO return a useful IDisposable
        }
        public void OnCompleted()
        {
            //TODO implement completion logic
        }
        public void OnError(Exception e)
        {
            //TODO implement error logic
        }
        public void OnNext(char value)
        {
            currentChars.Add(value);
            t.Start();
        }
        void TimerOver(object sender, EventArgs e)
        {
            char[] chars = currentChars.ToArray();
            foreach (var obs in observers)
                obs.OnNext(chars);
            currentChars.Clear();
            t.Stop();
        }
    }

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117175

I wrote an extension some time ago to do what you're after - BufferWithInactivity.

Here it is:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}

Upvotes: 0

Thomas Levesque
Thomas Levesque

Reputation: 292735

OK, I found a solution:

        Func<IObservable<char>> bufferClosingSelector =
            () =>
            _input.Timeout(TimeSpan.FromSeconds(1))
                  .Catch(Observable.Return('\0'))
                  .Where(i => i == '\0');
        _input.Buffer(bufferClosingSelector)
              .ObserveOnDispatcher()
              .Subscribe(OnCompleteInput);

Basically, the bufferClosingSelector pushes something whenever a timeout occurs, which closes the current buffer. There's probably a simpler and more elegant way, but it works... I'm open to better suggestions ;)

Upvotes: 0

Related Questions