francezu13k50
francezu13k50

Reputation: 77

How to project IObservable into a list of elements grouped by key in Rx?

Given an IObservable(Of T) how can we transform it to an IObservable(Of List(Of T)) that emits a list of which elements are grouped by some key ? Using GroupBy , Select and Scan operator i've managed to partition the source into observable's that produces lists of all elements for each key. I don't know how to further concatenate those lists into a single one.

    Dim source = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}.ToObservable()

    Dim keySelector = Function(element As Integer) As Integer
                          Return element Mod 3
                      End Function

    Dim result = source.GroupBy(Of Integer)(keySelector) _
                 .Select(Function(gr)
                             Return gr.Scan(New List(Of Integer), _
                                            Function(integers, current)
                                                integers.Add(current)
                                                Return integers
                                            End Function)
                         End Function)

    result.Subscribe(Sub(gr) gr.Subscribe(Sub(lst)
                                              Console.WriteLine(String.Join(",", lst))
                                          End Sub))

It produces the following output :

1
2
3
1,4
2,5
3,6
1,4,7
2,5,8
3,6,9
1,4,7,10

while i need it to be :

1
1,2
1,2,3
1,4,2,3
1,4,2,5,3
1,4,2,5,3,6
1,4,7,2,5,3,6
1,4,7,2,5,8,3,6
1,4,7,2,5,8,3,6,9
1,4,7,10,2,5,8,3,6,9

Upvotes: 1

Views: 325

Answers (2)

francezu13k50
francezu13k50

Reputation: 77

This is not an exact answer to my question, as it not produces the requested output type, but taking into account performance, i decided to test a new approach using @Carsten suggestion to use a Dictionary. My source produces values very fast and i might not need to process as values comes by, so instead i might do a .Throttle or a .Sample. Given this i collect all values for each keys into lists, and emit only a Dictionary(Of Integer,Of IList(Of T)). At the subcriber part, after throttling or sampling, the received dictionary is flattened.

    Dim source = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}.ToObservable()

    Dim keySelector = Function(element As Integer) As Integer
                          Return element Mod 3
                      End Function

    Dim result = source.Select(Function(i)
                                   Return New With {.reminder = keySelector(i), .value = i}
                               End Function) _
                        .Scan(ImmutableDictionary(Of Integer, IImmutableList(Of Integer)).Empty, _
                              Function(accumulate, current)
                                  Dim builder = accumulate.ToBuilder()
                                  If Not builder.ContainsKey(current.reminder) Then
                                      builder.Add(current.reminder, ImmutableList(Of Integer).Empty)
                                  End If
                                  Dim currentList = builder(current.reminder)
                                  builder(current.reminder) = currentList.Add(current.value)
                                  Return builder.ToImmutable()
                              End Function)


    result.Throttle(TimeSpan.FromMilliseconds(100)) _
        .Subscribe(Sub(dictionary)
                       Console.WriteLine( _
                           String.Join(",", dictionary.SelectMany(Function(pair)
                                                                      Return pair.Value
                                                                  End Function)))
                   End Sub)

It produces this :

3,6,9,1,4,7,10,2,5,8

The funny thing is that ImmutableDictionary(Of Tkey, TValue) orders it's keys as they are added, while Dictionary(Of TKey, TValue) does not. For now this is not really a concern.

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117027

This does what you need:

Dim result = _
    Observable _
        .Create(Of List(Of Integer))( _
            Function (o)
                Dim keysFound = 0
                Dim keyOrder = New Dictionary(Of Integer, Integer)
                Return _
                    source _
                        .Do( _
                            Sub (x)
                                Dim k = keySelector(x)
                                If Not keyOrder.ContainsKey(k) Then
                                    keyOrder.Add(k, keysFound)
                                    keysFound = keysFound  + 1
                                End If
                            End Sub) _                      
                        .Scan( _
                            New List(Of Integer), _
                            Function(integers, current)
                                integers.Add(current)
                                Return integers
                            End Function) _
                        .Select(Function(integers) _
                            integers.OrderBy(Function (x) _
                                keyOrder(keySelector(x))).ToList()) _
                        .Subscribe(o)
            End Function)

result.Subscribe(Sub(gr) Console.WriteLine(String.Join(",", gr)))

I get this result:

1
1,2
1,2,3
1,4,2,3
1,4,2,5,3
1,4,2,5,3,6
1,4,7,2,5,3,6
1,4,7,2,5,8,3,6
1,4,7,2,5,8,3,6,9
1,4,7,10,2,5,8,3,6,9

Upvotes: 2

Related Questions