Reputation: 10117
I'm creating an user event system using JDK 9 Flow API, so I have a room (which implements Flow.Subscriber<Notification>
), it may have many users and each user can offer (dispatch) updates at any time.
When a user enters the room, I subscribe the updates on the room user.subscribe(this)
. But there is no unsubscribe, how can I unsubscribe the user when he leaves the room?
public abstract class Room implements Flow.Subscriber<Notification> {
private Flow.Subscription subscription;
public void addUser(User user) {
user.subscribe(this);
}
public void removeUser(User user) {
// How can I unsubscribe the user?
}
@Override
public void onSubscribe(final Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onError(final Throwable throwable) {
// ...
}
@Override
public void onNext(final Notification notification) {
// ...
subscription.request(1);
}
@Override
public void onComplete() {
// User left
}
}
User class:
public class User extends SubmissionPublisher<Notification> {
....
public int offer(Notification item) {
return super.offer(item, (sub, msg) -> false);
}
}
Upvotes: 5
Views: 1017
Reputation: 7824
Rather than having Room
implement Subscriber<Notification>
, you can have a helper class, UserSubscriber
that implements Subscriber<Notification>
with one instance per User
. This can record the Subscription
and cancels it via a delegate method.
A Map<User,UserSubscriber>
keep tracks of the subscriptions.
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
public class RoomAndUserTest {
// The message that will be sent
static class Notification {
String message;
Notification(String message) {
this.message = message;
}
@Override
public String toString() {
return message;
}
}
// The User which can publish messages
static class User extends SubmissionPublisher<Notification> {
String name;
User(String message) {
this.name = message;
}
// Cause the use to send a message
public void great(String message) {
this.submit(new Notification(message));
}
@Override
public void close() {
super.close();
}
@Override
public String toString() {
return name;
}
}
// The Room than can subscribe to multiple Users
static class Room {
String name;
Map<User, UserSubscriber> subscriptions = new HashMap<>();
// Used to record messages for degugging
List<String> messages = new ArrayList<>();
Room(String message) {
this.name = message;
}
// called from the UserSubscriber onNext method
public void process(User user, Notification item) {
messages.add(user + ": " + item);
}
// When a User is added a new UserSubscriber is created
// and added to the list of subscribers
public void addUser(User user) {
var sub = new UserSubscriber(this, user);
user.subscribe(sub);
subscriptions.put(user, sub);
}
// When a user is removed cancel the subscription
public void removeUser(User user) {
var sub = subscriptions.get(user);
if (sub != null) {
sub.cancel();
subscriptions.remove(user);
}
}
}
// Used to manage a subscription
// has a reference to the room
static class UserSubscriber implements Subscriber<Notification> {
private Subscription subscription;
Room room;
User user; // not strictly necessay
public UserSubscriber(Room room, User user) {
super();
this.room = room;
this.user = user;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
// Calls the room.process() method.
@Override
public void onNext(Notification item) {
room.process(user, item);
subscription.request(1);
}
// delegate method to cancle the subscription
public void cancel() {
subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
}
}
Here is a test for basic function
@Test
public void test_add_and_remove_users() {
Room room = new Room("Living Room");
User alice = new User("Alice");
User bob = new User("Bob");
room.addUser(alice);
assertEquals(1,room.subscriptions.size()); // check that alice is in the room
assertEquals(1,alice.getSubscribers().size()); // check that alice is subscribed
alice.great("Hello"); // alice sends a message
// wait for the message to be processed
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
assertEquals(1,room.messages.size());
assertEquals("Alice: Hello", room.messages.get(0));
room.messages.clear();
room.addUser(bob); // add a second user
bob.great("Hi"); // bob sends a message
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
assertEquals(1,room.messages.size());
assertEquals("Bob: Hi", room.messages.get(0));
room.messages.clear();
alice.great("Hello"); // alice sends a message
// wait for the message to be processed
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
assertEquals(1,room.messages.size());
assertEquals("Alice: Hello", room.messages.get(0));
room.messages.clear();
assertEquals(1,alice.getSubscribers().size()); // check that alice is subscribed
assertEquals(2,room.subscriptions.size()); // check that alice and bob is still subscribed
room.removeUser(alice);
ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
assertEquals(0,alice.getSubscribers().size()); // check that alice is no longer subscribed
assertEquals(1,room.subscriptions.size()); // check that bob is still subscribed
}
}
Upvotes: 0