Reputation: 77
Given an IObservable(Of T)
how can we transform it to an IObservable(Of List(Of T))
that emits a list of which elements are grouped by some key ?
Using GroupBy
, Select
and Scan
operator i've managed to partition the source into observable's that produces lists of all elements for each key. I don't know how to further concatenate those lists into a single one.
Dim source = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}.ToObservable()
Dim keySelector = Function(element As Integer) As Integer
Return element Mod 3
End Function
Dim result = source.GroupBy(Of Integer)(keySelector) _
.Select(Function(gr)
Return gr.Scan(New List(Of Integer), _
Function(integers, current)
integers.Add(current)
Return integers
End Function)
End Function)
result.Subscribe(Sub(gr) gr.Subscribe(Sub(lst)
Console.WriteLine(String.Join(",", lst))
End Sub))
It produces the following output :
1
2
3
1,4
2,5
3,6
1,4,7
2,5,8
3,6,9
1,4,7,10
while i need it to be :
1
1,2
1,2,3
1,4,2,3
1,4,2,5,3
1,4,2,5,3,6
1,4,7,2,5,3,6
1,4,7,2,5,8,3,6
1,4,7,2,5,8,3,6,9
1,4,7,10,2,5,8,3,6,9
Upvotes: 1
Views: 325
Reputation: 77
This is not an exact answer to my question, as it not produces the requested output type, but taking into account performance, i decided to test a new approach using @Carsten suggestion to use a Dictionary
. My source produces values very fast and i might not need to process as values comes by, so instead i might do a .Throttle
or a .Sample
. Given this i collect all values for each keys into lists, and emit only a Dictionary(Of Integer,Of IList(Of T))
. At the subcriber part, after throttling or sampling, the received dictionary is flattened.
Dim source = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}.ToObservable()
Dim keySelector = Function(element As Integer) As Integer
Return element Mod 3
End Function
Dim result = source.Select(Function(i)
Return New With {.reminder = keySelector(i), .value = i}
End Function) _
.Scan(ImmutableDictionary(Of Integer, IImmutableList(Of Integer)).Empty, _
Function(accumulate, current)
Dim builder = accumulate.ToBuilder()
If Not builder.ContainsKey(current.reminder) Then
builder.Add(current.reminder, ImmutableList(Of Integer).Empty)
End If
Dim currentList = builder(current.reminder)
builder(current.reminder) = currentList.Add(current.value)
Return builder.ToImmutable()
End Function)
result.Throttle(TimeSpan.FromMilliseconds(100)) _
.Subscribe(Sub(dictionary)
Console.WriteLine( _
String.Join(",", dictionary.SelectMany(Function(pair)
Return pair.Value
End Function)))
End Sub)
It produces this :
3,6,9,1,4,7,10,2,5,8
The funny thing is that ImmutableDictionary(Of Tkey, TValue)
orders it's keys as they are added, while Dictionary(Of TKey, TValue)
does not. For now this is not really a concern.
Upvotes: 0
Reputation: 117027
This does what you need:
Dim result = _
Observable _
.Create(Of List(Of Integer))( _
Function (o)
Dim keysFound = 0
Dim keyOrder = New Dictionary(Of Integer, Integer)
Return _
source _
.Do( _
Sub (x)
Dim k = keySelector(x)
If Not keyOrder.ContainsKey(k) Then
keyOrder.Add(k, keysFound)
keysFound = keysFound + 1
End If
End Sub) _
.Scan( _
New List(Of Integer), _
Function(integers, current)
integers.Add(current)
Return integers
End Function) _
.Select(Function(integers) _
integers.OrderBy(Function (x) _
keyOrder(keySelector(x))).ToList()) _
.Subscribe(o)
End Function)
result.Subscribe(Sub(gr) Console.WriteLine(String.Join(",", gr)))
I get this result:
1 1,2 1,2,3 1,4,2,3 1,4,2,5,3 1,4,2,5,3,6 1,4,7,2,5,3,6 1,4,7,2,5,8,3,6 1,4,7,2,5,8,3,6,9 1,4,7,10,2,5,8,3,6,9
Upvotes: 2