Reputation: 2675
I have a Spring web server and I want to create a chat room for every N (for example 10) clients that requests my controller.
Every request to server has its own thread, how to collect every N request and create for example a room for it? I think that Rxjava has a solution for this so how can I implement this and if I can't do this, what's the best solution?
With help of @pavan-kumar answer I created this:
@RestController
public class GameController {
private final PublishSubject<Integer> subject;
private AtomicInteger counter = new AtomicInteger(0);
@Autowired
public GameController(PublishSubject<Integer> subject) {
this.subject = subject;
}
@PostConstruct
public void init() {
subject.buffer(10).subscribe(
integers -> {
StringBuilder builder = new StringBuilder("[ ");
for (Integer integer : integers) {
builder = builder.append(integer).append(", ");
}
String s = builder.append("]").toString();
System.out.println(s);
});
}
@RequestMapping(value = "/game", method = RequestMethod.GET)
public void findNewGame() {
int i = counter.addAndGet(1);
subject.onNext(i);
}
}
So current question is "Can I invoke Subject.onNext
method from different threads?"
Upvotes: 2
Views: 761
Reputation: 69997
Not directly. You have to provide serialization in some way or use the toSerialized()
method and communicate with the returned Subject<T, R>
instance.
PublishSubject<Integer> ps = PublishSubject.create();
Subject<Integer, Integer> subject = ps.toSerialized();
subject.subscribe(System.out::println);
subject.onNext(1);
Upvotes: 2
Reputation: 4820
Though the use case is not very clear for me, the below approach would probably help.
Every request to server has its own thread
- Use a static
Observable
shared amongst all threads. May be you can call onNext every time a new user establishes a connection.
Once you have such Observable
, you can subscribe to it via a buffer
as depicted below.
Observable.range(1, 50).buffer(10).subscribe(n -> System.out.println(n.get(0)));
Observable.range(1, 50)
is similar to your static
Observable
which emits events every time a connection is established. buffer
takes care of merging all the items into a List
of items and emits that one List when the said number of items(10) are emitted. You can subscribe
on this and take appropriate actions as needed.
Upvotes: 1