Reputation: 966
I am using RX 2.2.5. I have 2 views which load sub orders with
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Where(subOs => subOs != null)
.Snoop("BeforeGrpBy")
.GroupBy(subOs => subOs.OrderId)
.Subscribe(subOrdUpdates =>
{
AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));
})
Before groupBy it get all the sequence of elements, problem comes after groupby that it misses very rarely the element sequence. I don't think its concurrency issue as it's evident from the logs. Custom Snoop extension method is used to generate these logs.
16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
Format Time : (Thread) : Msg
As you can see before groupby onNext is called twice, but after it missed one. Is there something wrong with Rx grammar here or is it known issue? Any insight would help? If any further clarification is required kindly comment.
Update: Adding working/desirable logs:
16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
Update2: Possible bug or feature
GroupBy fires the groupedObservable only if fireNewMapEntry is true, (GroupBy.cs) and this happens here
if (!_map.TryGetValue(key, out writer))
{
writer = new Subject<TElement>();
_map.Add(key, writer);
fireNewMapEntry = true;
}
where _map is of type Dictionary<TKey, ISubject<TElement>>
. Is this could be the issue?
Upvotes: 2
Views: 294
Reputation: 10783
Just some notes on your code style (sorry, it isn't really an answer as I think @supertopi has answered)
Move you SubscribeOn
and ObserveOn
calls to be the last things you do before your final subscription. In you current code, you are executing the Where
, the Snoop
and the GroupBy
all on the _uiScheduler
taking up precious cycles.
Avoid subscribing with in a subscribe. It appears that AddIfNew
takes a key and an IObservable<T>
, thus I assume it is performing some subscription internally. Instead lean on what you know. If you are using the GroupBy, then you know the Key will be unique the first time a group is yielded. So this now can just be an Add (if it is the key you are checking). You can also use Take(1)
if you want to be explicit. If it is the value not the key you are checking, well then the GroupBy
appears to be redundant.
Try to keep you variable names consistent, so as another developer is reading through the query they are guided nicely, instead of jumping between subOs
, childOs
and childUpdates
, when childOrder
seems to be a better name (imo)
Ideally don't return null values in your observable sequence. What purpose does it serve? It may make sense in some rare occasions, but often I find that null is used instead of an OnCompleted
to indicate that there are no values for this sequence.
e.g.
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.Where(childOrder => childOrder != null)
.Snoop("BeforeGrpBy")
.GroupBy(childOrder => childOrder.OrderId)
.SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(newGroup =>
{
Add(newGroup.Item1, newGroup.Item2);
},
ex=>//obviously we have error handling here ;-)
);
or
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.Where(childOrder => childOrder != null)
.Snoop("BeforeGrpBy")
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(childOrder =>
{
AddIfNew(childOrder.OrderId, childOrder);
},
ex=>//obviously we have error handling here ;-)
);
and even better (without snoop and null checks)
var subscription = _transportService
.ObserveSubOrder(parentOrder.OrderId)
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(
childOrder => AddIfNew(childOrder.OrderId, childOrder),
ex=>//obviously we have error handling here ;-)
);
hth
Upvotes: 1
Reputation: 3488
You are missing the nature of GroupBy
.
The operator emits OnNext
only after a new group appears (see implementation GroupBy.cs:67). In your case, the orderID
equals for both notifications so only one OnNext
is emitted.
The value emited by the operator is of IGroupedObservable<T>
to which you can subscribe if you need access to further notifications within the group.
Upvotes: 0