Reputation: 3858
IMPORTANT: for a description of the results and some more details, please have a look also to my answer
I need to group and filter a sequence of objects/events that usually are replicated, buffering them with a TimeSpan interval. I try to explain it better with sort of marble diagrams:
X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z
would produce
X---Y---Z---X---Y---Z
where X, Y and Z are different event types, and '---' means the interval. Additionally, I would also like to distinct by a key property that it is available on all types because they have a common base class:
X, Y, Z : A
and A contains a property Key. Using the notation X.a meaning X.Key = a, A final sample would be:
X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c
would produce
X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b
Can anybody help me putting together the required Linq operators (probably DistinctUntilChanged and Buffer) to achieve this behavior? Thanks
UPDATE 18.08.12:
as requested, I try to give a better explanation. We have devices collecting and sending events to a web service. These devices have an old logic (and we can't change it due to backward compatibility) and they continuously send an event until they receive an acknowledge; after the acknowledge, they send the next event in their queue, and so on. Events contain the network address of the unit and some other properties distinguishing events in the queue for each device. An event looks like this:
class Event
{
public string NetworkAddress { get; }
public string EventCode { get; }
public string AdditionalAttribute { get; }
}
The goal is that of processing every 5 seconds the distinguished events received from all devices, storing information in the database (that's why we don't want to do it in batches) and sending the ack to the device. Let's make an example with only two devices and some events:
Device 'a':
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x'
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y'
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x'
Device 'b':
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y'
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x'
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y'
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x'
Pn are the operations done by our server, explained later
Possible marble diagram (input streams + output stream):
Device 'a' : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-...
Device 'b' : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-...
Time : ------------[1s]-----------[2s]------------[3s]------------[4s]-
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]-
P1: Server stores and acknowledges [a1] and [b1]
P2: " " " " [b2]
P3: " " " " [a2] and [b3]
P4: " " " " [a3] and [b4]
At the end I think it is probably a simple combination of basic operators, but I'm new to Rx and I'm a bit confused since it seems that there are lots of operators (or combinations of operators) to get the same output stream.
Update 19.08.12:
Please keep in mind that this code runs on a server and it should run for days without memory leaks...I'm not sure about the behavior of subjects. At the moment, for each event I call a push operation on a service, which calls the OnNext of a Subject on top of which I should build the query (if I'm not wrong about the usage of subjects).
Update 20.08.12:
Current implementation, including validation test; this is what I tried and it seems the same suggested by @yamen
public interface IEventService
{
// Persists the events
void Add(IEnumerable<Event> events);
}
public class Event
{
public string Description { get; set; }
}
/// <summary>
/// Implements the logic to handle events.
/// </summary>
public class EventManager : IDisposable
{
private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5);
private readonly Subject<EventMessage> subject = new Subject<EventMessage>();
private readonly IDisposable subscription;
private readonly object locker = new object();
private readonly IEventService eventService;
/// <summary>
/// Initializes a new instance of the <see cref="EventManager"/> class.
/// </summary>
/// <param name="scheduler">The scheduler.</param>
public EventManager(IEventService eventService, IScheduler scheduler)
{
this.eventService = eventService;
this.subscription = this.CreateQuery(scheduler);
}
/// <summary>
/// Pushes the event.
/// </summary>
/// <param name="eventMessage">The event message.</param>
public void PushEvent(EventMessage eventMessage)
{
Contract.Requires(eventMessage != null);
this.subject.OnNext(eventMessage);
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
/// <filterpriority>2</filterpriority>
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
{
// Dispose unmanaged resources
}
this.subject.Dispose();
this.subscription.Dispose();
}
private IDisposable CreateQuery(IScheduler scheduler)
{
var buffered = this.subject
.DistinctUntilChanged(new EventComparer())
.Buffer(EventHandlingPeriod, scheduler);
var query = buffered
.Subscribe(this.HandleEvents);
return query;
}
private void HandleEvents(IList<EventMessage> eventMessages)
{
Contract.Requires(eventMessages != null);
var events = eventMessages.Select(this.SelectEvent);
this.eventService.Add(events);
}
private Event SelectEvent(EventMessage message)
{
return new Event { Description = "evaluated description" };
}
private class EventComparer : IEqualityComparer<EventMessage>
{
public bool Equals(EventMessage x, EventMessage y)
{
return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
}
public int GetHashCode(EventMessage obj)
{
var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
return s.GetHashCode();
}
}
}
public class EventMessage
{
public string NetworkAddress { get; set; }
public byte EventCode { get; set; }
public byte Attribute { get; set; }
// Other properties
}
And the test:
public void PushEventTest()
{
const string Address1 = "A:2.1.1";
const string Address2 = "A:2.1.2";
var eventServiceMock = new Mock<IEventService>();
var scheduler = new TestScheduler();
var target = new EventManager(eventServiceMock.Object, scheduler);
var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
scheduler.Schedule(() => target.PushEvent(eventMessageA1));
scheduler.Schedule(TimeSpan.FromSeconds(1), () => target.PushEvent(eventMessageB1));
scheduler.Schedule(TimeSpan.FromSeconds(2), () => target.PushEvent(eventMessageA1));
scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks);
eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once());
scheduler.Schedule(TimeSpan.FromSeconds(3), () => target.PushEvent(eventMessageB1));
scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks);
eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once());
}
Additionally, I remark again that it is really important that the software could run for days without problems, handling thousands of messages. To make it clear: the test doesn't pass with the current implementation.
Upvotes: 10
Views: 3377
Reputation: 3858
After searches and experiments, I put together some code that produces the output that I expect:
static void Main(string[] args)
{
const string Address1 = "A:2.1.1";
const string Address2 = "A:2.1.2";
var comparer = new EventComparer();
var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 };
var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 };
var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 5 };
var list = new[] { eventMessageA1, eventMessageA1, eventMessageB1, eventMessageA2, eventMessageA1, eventMessageA1 };
var queue = new BlockingCollection<EventMessage>();
Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe
(
l => list.ToList().ForEach(m =>
{
Console.WriteLine("Producing {0} on thread {1}", m, Thread.CurrentThread.ManagedThreadId);
queue.Add(m);
})
);
// subscribing
queue.GetConsumingEnumerable()
.ToObservable()
.Buffer(TimeSpan.FromSeconds(5))
.Subscribe(e =>
{
Console.WriteLine("Queue contains {0} items", queue.Count);
e.Distinct(comparer).ToList().ForEach(m =>
Console.WriteLine("{0} - Consuming: {1} (queue contains {2} items)", DateTime.UtcNow, m, queue.Count));
}
);
Console.WriteLine("Type enter to exit");
Console.ReadLine();
}
public class EventComparer : IEqualityComparer<EventMessage>
{
public bool Equals(EventMessage x, EventMessage y)
{
var result = x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute;
return result;
}
public int GetHashCode(EventMessage obj)
{
var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute);
return s.GetHashCode();
}
}
public class EventMessage
{
public string NetworkAddress { get; set; }
public byte EventCode { get; set; }
public byte Attribute { get; set; }
public override string ToString()
{
const string Format = "{0} ({1}, {2})";
var s = string.Format(Format, this.NetworkAddress, this.EventCode, this.Attribute);
return s;
}
}
Anyway, monitoring the application, it seems that this causes a memory leak. My question is now:
UPDATE:
it seems that the memory increment lasts only some minutes, then the value is stable. I will run a long test. Of course, this would be an absolutely acceptable behavior.
UPDATE 26.08.12:
Anyway, I think that my question is still open for unit tests using the test scheduler.
thanks Francesco
Upvotes: 0
Reputation: 15618
Not sure if this is exactly what you want, but it seems to support your use cases.
First, let's define the base class to use (you can easily modify this to suit your needs):
public class MyEvent
{
public string NetworkAddress { set; get; }
public string EventCode { set; get; }
}
Let's set up your devices as an array of IObservable<MyEvent>
- you may have these available differently, and the below would need to change to accommodate that of course. These devices will each produce a value with a random delay between 0.5 and 1.5 seconds.
var deviceA = new MyEvent[] { new MyEvent() {NetworkAddress = "A", EventCode = "1"},
new MyEvent() {NetworkAddress = "A", EventCode = "1"},
new MyEvent() {NetworkAddress = "A", EventCode = "2"} };
var deviceB = new MyEvent[] { new MyEvent() {NetworkAddress = "B", EventCode = "1"},
new MyEvent() {NetworkAddress = "B", EventCode = "2"},
new MyEvent() {NetworkAddress = "B", EventCode = "2"},
new MyEvent() {NetworkAddress = "B", EventCode = "3"} };
var random = new Random();
var deviceARand = deviceA.ToObservable().Select(a => Observable.Return(a).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat();
var deviceBRand = deviceB.ToObservable().Select(b => Observable.Return(b).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat();
var devices = new IObservable<MyEvent>[] { deviceARand, deviceBRand };
Now let's take all of these individual device streams, make them 'distinct', and merge them into a single master stream:
var stream = devices.Aggregate(Observable.Empty<MyEvent>(), (acc, device) => acc.DistinctUntilChanged(a => a.EventCode).Merge(device));
Once you have that, getting this stream to be consumed periodically is just a matter of buffering it with Buffer
:
stream.Buffer(TimeSpan.FromSeconds(1)).Subscribe(x => { /* code here works on a list of the filtered events per second */ });
Upvotes: 2
Reputation: 4169
I'm not sure if this does exactly what you'd like, but you may be to group the elements explicitly using the group
keyword, and then to manipulate the various IObservable
s separately before recombining them.
E.g. if we have class definitions such as
class A
{
public char Key { get; set; }
}
class X : A { }
...
and a Subject<A>
Subject<A> subject = new Subject<A>();
then we can write
var buffered =
from a in subject
group a by new { Type = a.GetType(), Key = a.Key } into g
from buffer in g.Buffer(TimeSpan.FromMilliseconds(300))
where buffer.Any()
select new
{
Count = buffer.Count,
Type = buffer.First().GetType().Name,
Key = buffer.First().Key
};
buffered.Do(Console.WriteLine).Subscribe();
We can test this with the data you provided:
subject.OnNext(new X { Key = 'a' });
Thread.Sleep(100);
subject.OnNext(new X { Key = 'b' });
Thread.Sleep(100);
subject.OnNext(new X { Key = 'a' });
Thread.Sleep(100);
...
subject.OnCompleted();
To get the output you provided:
{ Count = 2, Type = X, Key = a }
{ Count = 1, Type = X, Key = b }
{ Count = 1, Type = Y, Key = b }
{ Count = 1, Type = Y, Key = c }
{ Count = 2, Type = Z, Key = a }
{ Count = 2, Type = Z, Key = c }
{ Count = 1, Type = Z, Key = b }
Upvotes: 4