mojtab23
mojtab23

Reputation: 2675

Rxjava - Can I invoke `Subject.onNext` method from different threads?

I have a Spring web server and I want to create a chat room for every N (for example 10) clients that requests my controller.

Every request to server has its own thread, how to collect every N request and create for example a room for it? I think that Rxjava has a solution for this so how can I implement this and if I can't do this, what's the best solution?

Update 1:

With help of @pavan-kumar answer I created this:

@RestController
public class GameController {

    private final PublishSubject<Integer> subject;
    private AtomicInteger counter = new AtomicInteger(0);

    @Autowired
    public GameController(PublishSubject<Integer> subject) {
        this.subject = subject;
    }

    @PostConstruct
    public void init() {


        subject.buffer(10).subscribe(
                integers -> {
                    StringBuilder builder = new StringBuilder("[ ");
                    for (Integer integer : integers) {
                        builder = builder.append(integer).append(", ");
                    }
                    String s = builder.append("]").toString();
                    System.out.println(s);
                });
    }


    @RequestMapping(value = "/game", method = RequestMethod.GET)
    public void findNewGame() {
        int i = counter.addAndGet(1);
        subject.onNext(i);

    }


}

So current question is "Can I invoke Subject.onNext method from different threads?"

Upvotes: 2

Views: 761

Answers (2)

akarnokd
akarnokd

Reputation: 69997

Not directly. You have to provide serialization in some way or use the toSerialized() method and communicate with the returned Subject<T, R> instance.

PublishSubject<Integer> ps = PublishSubject.create();

Subject<Integer, Integer> subject = ps.toSerialized();

subject.subscribe(System.out::println);

subject.onNext(1);

Upvotes: 2

Pavan Kumar
Pavan Kumar

Reputation: 4820

Though the use case is not very clear for me, the below approach would probably help.

Every request to server has its own thread - Use a static Observable shared amongst all threads. May be you can call onNext every time a new user establishes a connection.

Once you have such Observable, you can subscribe to it via a buffer as depicted below.

Observable.range(1, 50).buffer(10).subscribe(n -> System.out.println(n.get(0)));

Observable.range(1, 50) is similar to your static Observable which emits events every time a connection is established. buffer takes care of merging all the items into a List of items and emits that one List when the said number of items(10) are emitted. You can subscribe on this and take appropriate actions as needed.

Upvotes: 1

Related Questions