Reputation: 292735
Sorry if the title isn't very clear, I couldn't think of anything better...
I'm receiving user input in the form of an IObservable<char>
, and I'd like to transform it to an IObservable<char[]>
, by grouping the chars every time the user stops typing for more than 1 second. So, for instance, if the input is as follows:
h
e
l
l
o
(pause)
w
o
r
l
d
(pause)
!
(pause)
I'd like the output observable to be:
['h', 'e', 'l', 'l', 'o']
['w', 'o', 'r', 'l', 'd']
['!']
I suspect the solution is fairly simple, but I can't find the right approach... I tried to use Buffer
, GroupByUntil
, Throttle
and a few others, to no avail.
Any idea would be welcome!
EDIT: I've got something that almost works:
_input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1)))
.ObserveOnDispatcher()
.Subscribe(OnCompleteInput);
But I need the delay to be reset every time a new character is typed...
Upvotes: 8
Views: 1213
Reputation: 6155
Buffer
and Throttle
would be enough, if your source is hot. To make it hot, you can use .Publish().RefCount()
to ensure you only end up with one subscription to the source.
IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source,
TimeSpan dueTime)
{
if (source == null) throw new ArgumentNullException("source");
//defer dueTime checking to Throttle
var hot = source.Publish().RefCount();
return hot.Buffer(() => hot.Throttle(dueTime));
}
Upvotes: 7
Reputation: 56586
This ought to work. It's not nearly as concise as your solution, as it implements the logic through a class instead of extension methods, but it might be a better way to do it. In short: every time you get a char
, add it to a List
and (re)start a timer that will expire in one second; when the timer expires, notify our subscribers with the List
as an array and reset the state so it's ready for next time.
class Breaker : IObservable<char[]>, IObserver<char>
{
List<IObserver<char[]>> observers = new List<IObserver<char[]>>();
List<char> currentChars;
DispatcherTimer t;
public Breaker(IObservable<char> source)
{
source.Subscribe(this);
t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) };
t.Tick += TimerOver;
currentChars = new List<char>();
}
public IDisposable Subscribe(IObserver<char[]> observer)
{
observers.Add(observer);
return null; //TODO return a useful IDisposable
}
public void OnCompleted()
{
//TODO implement completion logic
}
public void OnError(Exception e)
{
//TODO implement error logic
}
public void OnNext(char value)
{
currentChars.Add(value);
t.Start();
}
void TimerOver(object sender, EventArgs e)
{
char[] chars = currentChars.ToArray();
foreach (var obs in observers)
obs.OnNext(chars);
currentChars.Clear();
t.Stop();
}
}
Upvotes: 0
Reputation: 117175
I wrote an extension some time ago to do what you're after - BufferWithInactivity
.
Here it is:
public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
this IObservable<T> source,
TimeSpan inactivity,
int maximumBufferSize)
{
return Observable.Create<IEnumerable<T>>(o =>
{
var gate = new object();
var buffer = new List<T>();
var mutable = new SerialDisposable();
var subscription = (IDisposable)null;
var scheduler = Scheduler.ThreadPool;
Action dump = () =>
{
var bts = buffer.ToArray();
buffer = new List<T>();
if (o != null)
{
o.OnNext(bts);
}
};
Action dispose = () =>
{
if (subscription != null)
{
subscription.Dispose();
}
mutable.Dispose();
};
Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
onAction =>
{
lock (gate)
{
dispose();
dump();
if (o != null)
{
onAction(o);
}
}
};
Action<Exception> onError = ex =>
onErrorOrCompleted(x => x.OnError(ex));
Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());
Action<T> onNext = t =>
{
lock (gate)
{
buffer.Add(t);
if (buffer.Count == maximumBufferSize)
{
dump();
mutable.Disposable = Disposable.Empty;
}
else
{
mutable.Disposable = scheduler.Schedule(inactivity, () =>
{
lock (gate)
{
dump();
}
});
}
}
};
subscription =
source
.ObserveOn(scheduler)
.Subscribe(onNext, onError, onCompleted);
return () =>
{
lock (gate)
{
o = null;
dispose();
}
};
});
}
Upvotes: 0
Reputation: 292735
OK, I found a solution:
Func<IObservable<char>> bufferClosingSelector =
() =>
_input.Timeout(TimeSpan.FromSeconds(1))
.Catch(Observable.Return('\0'))
.Where(i => i == '\0');
_input.Buffer(bufferClosingSelector)
.ObserveOnDispatcher()
.Subscribe(OnCompleteInput);
Basically, the bufferClosingSelector
pushes something whenever a timeout occurs, which closes the current buffer. There's probably a simpler and more elegant way, but it works... I'm open to better suggestions ;)
Upvotes: 0