Ajay Kumar
Ajay Kumar

Reputation: 3250

Spring WebFlux - Convert Flux to List<Object>

I am learning Spring WebFlux.

My Entity goes like this:

@Table("users")
public class User {
    @Id
    private Integer id;
    private String name;
    private int age;
    private double salary;
}

I have a Repository (R2 using H2 Database) like below:

public interface UserRepository extends ReactiveCrudRepository<User,Integer> {
   
}

And my controller is:

    @Autowired
    private UserRepository userRepository;

    private static List<User> userList = new ArrayList<>();

    @PostConstruct
    public void initializeStockObjects() {
        User stock1 = new User(11, "aaaa", 123, 123);
        User stock2 = new User(12, "bbb", 123, 123);
        User stock3 = new User(13, "ccc", 123, 123);
        userList.add(stock1);
        userList.add(stock2);
        userList.add(stock3);
    }

    @RequestMapping(value = "/livelistofusers", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<List<User>> getUsers() {
        return getUserData(userList);
    }

    public Flux<List<User>> getUserData(List<User> userList) {
       Flux<Long> interval = Flux.interval(Duration.ofSeconds(3));
       interval.subscribe((i) -> userList.forEach(user -> addNewUser(user)));
       Flux<List<User>> transactionFlux = Flux.fromStream(Stream.generate(() -> userList));
       return Flux.zip(interval, transactionFlux).map(Tuple2::getT2);
     }

All good till this point. I am able to return the the entire list of users every 3 seconds to the view. No issues at all here.

Now, I want to send the Flue i.e. Flux flux2 = userRepository.findAll() to the view. That means, instead of return getUserData(userList); how can I do return getUserData(flux2(...what should I do here ???... I tried couple of things but I end up making the Blocking list instead of Non-Blocking ...)); ?

Question: How can I achieve this? i.e. How can I send the entire Flux every 3 seconds to my view. I am feeling lost here and clueless. Any relevant help links or solution will be greatly appreciated.

Edit:

As per Nipuna's comments I tried this:

@RequestMapping(value = "/livelistofusersall", method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<List<User>>  getUsersall() {
        Flux<Long> interval = Flux.interval(Duration.ofSeconds(3));
        interval.subscribe((i) -> userRepository.findAll());
        Flux<List<User>> transactionFlux = userRepository.findAll().collectList().flatMapMany(Flux::just);
        return Flux.zip(interval, transactionFlux).map(Tuple2::getT2);
    }

But now at my context path, the list loads but "only once" after a wait of 3 seconds. What I am missing here?

Upvotes: 13

Views: 39318

Answers (1)

Nipuna Saranga
Nipuna Saranga

Reputation: 1200

You can use collectList() operator in Flux for this which gives a Mono of List.

userRepository.findAll().collectList().flatMapMany(Flux::just);

Upvotes: 18

Related Questions