Reputation: 2091
Below example prints integers from 1 to 10 and a list of (7, 8, 9, 10)
public void streamCollect() {
ConnectableFlux<Integer> connect = Flux.range(1, 10)
.publish();
connect.subscribe(v -> System.out.println("1: " + v));
connect
.filter(number -> number > 6)
.collectList()
.subscribe(v -> System.out.println("4: " + v));
connect.connect();
}
Result:
1: 1
1: 2
1: 3
1: 4
1: 5
1: 6
1: 7
1: 8
1: 9
1: 10
4: [7, 8, 9, 10]
Next example should produce the same result but instead prints out only numbers from 1 to 10 but no list. Why?
public void streamCollect() {
ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.forEach(t -> emitter.next(t));
}).publish();
connect.subscribe(v -> System.out.println("1: " + v));
connect
.filter(number -> number > 6)
.collectList()
.subscribe(v -> System.out.println("4: " + v));
connect.connect();
}
Result:
1: 1
1: 2
1: 3
1: 4
1: 5
1: 6
1: 7
1: 8
1: 9
1: 10
Upvotes: 2
Views: 2661
Reputation: 28301
The collectList waits for the onComplete signal, which you never produce in your create lambda
Upvotes: 4