António Almeida
António Almeida

Reputation: 10117

Java Flow.Subscriber - How can I unsubscribe?

I'm creating an user event system using JDK 9 Flow API, so I have a room (which implements Flow.Subscriber<Notification>), it may have many users and each user can offer (dispatch) updates at any time.

When a user enters the room, I subscribe the updates on the room user.subscribe(this). But there is no unsubscribe, how can I unsubscribe the user when he leaves the room?

public abstract class Room implements Flow.Subscriber<Notification> {
    private Flow.Subscription subscription;

    public void addUser(User user) {
        user.subscribe(this);
    }

    public void removeUser(User user) {
        // How can I unsubscribe the user?
    }

    @Override
    public void onSubscribe(final Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        // ...
    }

    @Override
    public void onNext(final Notification notification) {
        // ...
        subscription.request(1);
    }

    @Override
    public void onComplete() {
        // User left
    }
}

User class:

public class User extends SubmissionPublisher<Notification> {

    ....

    public int offer(Notification item) {
        return super.offer(item, (sub, msg) -> false);
    }
}

Upvotes: 5

Views: 1017

Answers (1)

Salix alba
Salix alba

Reputation: 7824

Rather than having Room implement Subscriber<Notification>, you can have a helper class, UserSubscriber that implements Subscriber<Notification> with one instance per User. This can record the Subscription and cancels it via a delegate method. A Map<User,UserSubscriber> keep tracks of the subscriptions.

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

public class RoomAndUserTest {
    // The message that will be sent
    static class Notification {
        String message;

        Notification(String message) {
            this.message = message;
        }

        @Override
        public String toString() {
            return message;
        }
    }

    // The User which can publish messages
    static class User extends SubmissionPublisher<Notification> {
        String name;

        User(String message) {
            this.name = message;
        }
        // Cause the use to send a message
        public void great(String message) {
            this.submit(new Notification(message));
        }

        @Override
        public void close() {
            super.close();
        }

        @Override
        public String toString() {
            return name;
        }
    }
    // The Room than can subscribe to multiple Users
    static class Room {
        String name;
        Map<User, UserSubscriber> subscriptions = new HashMap<>();
        // Used to record messages for degugging
        List<String> messages = new ArrayList<>();
        Room(String message) {
            this.name = message;
        }

        // called from the UserSubscriber onNext method
        public void process(User user, Notification item) {
            messages.add(user + ": " + item);
        }

        // When a User is added a new UserSubscriber is created
        // and added to the list of subscribers
        public void addUser(User user) {
            var sub = new UserSubscriber(this, user);
            user.subscribe(sub);
            subscriptions.put(user, sub);
        }
        // When a user is removed cancel the subscription
        public void removeUser(User user) {
            var sub = subscriptions.get(user);
            if (sub != null) {
                sub.cancel();
                subscriptions.remove(user);
            }
        }
    }

    // Used to manage a subscription
    // has a reference to the room
    static class UserSubscriber implements Subscriber<Notification> {
        private Subscription subscription;
        Room room;
        User user; // not strictly necessay 

        public UserSubscriber(Room room, User user) {
            super();
            this.room = room;
            this.user = user;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }

        // Calls the room.process() method.
        @Override
        public void onNext(Notification item) {
            room.process(user, item);
            subscription.request(1);
        }

        // delegate method to cancle the subscription
        public void cancel() {
            subscription.cancel();
        }

        @Override
        public void onError(Throwable throwable) {
            throwable.printStackTrace();
        }

        @Override
        public void onComplete() {
        }


    }

Here is a test for basic function


    @Test
    public void test_add_and_remove_users() {
        Room room = new Room("Living Room");
        User alice = new User("Alice");
        User bob = new User("Bob");
        room.addUser(alice);
        assertEquals(1,room.subscriptions.size()); // check that alice is in the room
        assertEquals(1,alice.getSubscribers().size()); // check that alice is subscribed
        
        alice.great("Hello"); // alice sends a message
        // wait for the message to be processed
        ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
        assertEquals(1,room.messages.size());
        assertEquals("Alice: Hello", room.messages.get(0));
        room.messages.clear();
        
        room.addUser(bob); // add a second user
        bob.great("Hi");    // bob sends a message
        ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
        assertEquals(1,room.messages.size());
        assertEquals("Bob: Hi", room.messages.get(0));
        room.messages.clear();

        alice.great("Hello"); // alice sends a message
        // wait for the message to be processed
        ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
        assertEquals(1,room.messages.size());
        assertEquals("Alice: Hello", room.messages.get(0));
        room.messages.clear();

        assertEquals(1,alice.getSubscribers().size()); // check that alice is subscribed
        assertEquals(2,room.subscriptions.size()); // check that alice and bob is still subscribed

        room.removeUser(alice);
        ForkJoinPool.commonPool().awaitQuiescence(1000, TimeUnit.MILLISECONDS);
        assertEquals(0,alice.getSubscribers().size()); // check that alice is no longer subscribed
        assertEquals(1,room.subscriptions.size()); // check that bob is still subscribed
    }
    
}

Upvotes: 0

Related Questions