Dipu
Dipu

Reputation: 8339

Convert Stream<Stream<T>> to Stream<T>

I am using rxdart package to handle stream in dart. I am stuck in handling a peculiar problem.

Please have a look at this dummy code:

final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));

Now I want to convert the oops variable to get only Observable<T>.

I am finding it difficult to explain clearly. But let me try. I have a stream A. I map each output of stream A to another stream B. Now I have Stream<Stream<B>> - a kind of recurrent stream. I just want to listen to the latest value produced by this pattern. How may I achieve this?

Upvotes: 7

Views: 3342

Answers (4)

patryk
patryk

Reputation: 19

You need to call asyncExpand method on your stream - it lets you transform each element into a sequence of asynchronous events.

Upvotes: 0

Dipu
Dipu

Reputation: 8339

I will list several ways to flatten the Stream<Stream<T>> into single Stream<T>.

1. Using pure dart

As answered by @Irn, this is a pure dart solution:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
    s.listen(print);
}

Outputs: 1 2 3 4 1 2 3 4 1 2 3 4

2. Using Observable.flatMap

Observable has a method flatMap that flattens the output stream and attach it to ongoing stream:

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
    s.listen(print);
}

Outputs: 1 2 3 4 1 2 3 4 1 2 3 4

3. Using Observable.switchLatest

Convert a Stream that emits Streams (aka a "Higher Order Stream") into a single Observable that emits the items emitted by the most-recently-emitted of those Streams.

This is the solution I was looking for! I just needed the latest output emitted by the internal stream.

import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.switchLatest(
            Observable.fromIterable(list).map(getStream));
    s.listen(print);
}

Outputs: 1 1 1 2 3 4

Upvotes: 5

Aditya
Aditya

Reputation: 2246

If you want a Stream> to return a Stream, you basically need to flatten the stream. The flatmap function is what you would use here

public static void main(String args[]) {
    StreamTest st = new StreamTest<String>();
    List<String> l = Arrays.asList("a","b", "c");
    Stream s = l.stream().map(i -> st.getStream(i)).flatMap(i->i);
}

Stream<T> static getStream(T uid) {
    // a sample code that returns a stream
    return Stream.of(uid);
}

If you need the first object, then use the findFirst() method

public static void main(String args[]) {
    StreamTest st = new StreamTest<String>();
    List<String> l = Arrays.asList("a","b", "c");
    String str = l.stream().map(i -> st.getStream(i)).flatMap(i->i).findFirst().get();
}

Upvotes: 0

lrn
lrn

Reputation: 71903

It's somewhat rare to have a Stream<Stream<Something>>, so it isn't something that there is much explicit support for.

One reason is that there are several (at least two) ways to combine a stream of streams of things into a stream of things.

  1. Either you listen to each stream in turn, waiting for it to complete before starting on the next, and then emit the events in order.

  2. Or you listen on each new stream the moment it becomes available, and then emit the events from any stream as soon as possible.

The former is easy to write using async/await:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

The later is more complicated because it requires listening on more than one stream at a time, and combining their events. (If only StreamController.addStream allowed more than one stream at a time, then it would be much easier). You can use the StreamGroup class from package:async for this:

import "package:async/async" show StreamGroup;

Stream<T> mergeStreams<T>(Stream<Stream<T>> source) {
  var sg = StreamGroup<T>();
  source.forEach(sg.add).whenComplete(sg.close);
  // This doesn't handle errors in [source].
  // Maybe insert 
  //   .catchError((e, s) {
  //      sg.add(Future<T>.error(e, s).asStream())
  // before `.whenComplete` if you worry about errors in [source].          
  return sg.stream;
}

Upvotes: 3

Related Questions