user1518451
user1518451

Reputation: 1845

Synchronization Queue using synchronized block

I'd like to synchronize my app because sometimes server send messages to wrong user. I use synchronized block to synchronize queue but my solution doesn't work - sometimes user receive message not for him.

Here is the code (server.java): (InWorker - receive messages from users, OutWorker - send messages to users) every user has own class (thread) - MiniServer (contain two threads: InWorker and OutWorker).

    class InWorker implements Runnable{

 String slowo=null;
 ObjectOutputStream oos;
 ObjectInputStream ois;
 ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>();
 Message message=null;

InWorker(ObjectInputStream ois,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) {
    this.ois=ois;
    this.map=map;
}

public void run() {

    while(true) {
            //synchronized(queue) {
        try {
            message = (Message) ois.readObject();
            slowo=message.msg;
            if(slowo!=null && !slowo.equals("Bye")) {
                        if(!map.containsKey(message.id)) {
                            map.putIfAbsent(message.id, new LinkedBlockingQueue<Message>());
                        try {
                            map.get(message.id).put(message);
                        } catch (InterruptedException ex) {
                            Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        }
                        else
                        {
                        try {
                            map.get(message.id).put(message);
                        } catch (InterruptedException ex) {
                            Logger.getLogger(Communicator.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        }
                        }

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
        e.printStackTrace();
        }
            //}
        Thread.yield();
        } 
}
}

class OutWorker implements Runnable{

String tekst=null;
ObjectOutputStream oos=null;
String id;
Message message;
ConcurrentMap<String,LinkedBlockingQueue<Message>> map=new ConcurrentHashMap<String, LinkedBlockingQueue<Message>>();

OutWorker(ObjectOutputStream oos,String id,ConcurrentMap<String,LinkedBlockingQueue<Message>> map) {
    this.oos=oos;
    this.id=id;
    this.map=map;
}

public void run() {
    while(true) {
            //synchronized(queue) {
                if(map.containsKey(id)) {
                while(!map.get(id).isEmpty()) {
                        try {
                            message=map.get(id).take();
                        } catch (InterruptedException ex) {
                            Logger.getLogger(OutWorker.class.getName()).log(Level.SEVERE, null, ex);
                        }
                        try {
                            oos.writeObject(message);
                            oos.flush();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                }

                }
            //}
        Thread.yield();
}}}

Here is the MiniServer and Server class:

class MiniSerwer implements Runnable{

    Socket socket=null;
    ExecutorService exec=Executors.newCachedThreadPool();
    ObjectOutputStream oos=null;
    ObjectInputStream ois=null;
    String id;
    Queue<Message> queue=new LinkedList<Message>();

    MiniSerwer(ObjectOutputStream oos,ObjectInputStream ois,String id,Queue<Message> queue) {
        this.oos=oos;
                this.ois=ois;
        this.id=id;
        this.queue=queue;
    }

    public void run() {
            exec.execute(new InWorker(ois,queue)); // input stream
            exec.execute(new OutWorker(oos,id,queue)); //output stream
            Thread.yield();
    }
}

public class Serwer implements Runnable{

ServerSocket serversocket=null;
ExecutorService exec= Executors.newCachedThreadPool();
int port;
String id=null;
Queue<Message> queue=new LinkedList<Message>();
BufferedReader odczyt=null;

ObjectInputStream ois=null;
Message message=null;
ObjectOutputStream oos=null;

Serwer(int port) {
    this.port=port;
}

public void run() {
    try {
        serversocket=new ServerSocket(port);
        while(true) {
            Socket socket=null;
            try {
                socket = serversocket.accept();                                
                                /* first message is login*/
                                oos=new ObjectOutputStream(socket.getOutputStream());
                                oos.flush();
                                ois=new ObjectInputStream(socket.getInputStream());
                                message = (Message) ois.readObject();
                                id=message.sender;
                                System.out.println(id+" log in to the server");

                                exec.execute(new MiniSerwer(oos,ois,id,queue)); // create new thread
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
                        catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        }    
        }
    } catch (IOException e) {
        e.printStackTrace();
}
}

public static void main(String[] args) {
    int port;
        port=8821;
    ExecutorService exec=Executors.newCachedThreadPool();
    exec.execute(new Serwer(port));
}

Can anyone help me ?

Edit: I change queue to ConcurrentHashMap but sometimes messages are send to the wrong user. Why ?

Upvotes: 0

Views: 1033

Answers (1)

jtahlborn
jtahlborn

Reputation: 53694

This is a classic producer/consumer scenario. ditch the synchronized blocks and use a BlockingQueue (InWorker calls put() and OutWorker calls take()).

also, in your Server class, you should be creating a new queue per connection, not sharing the same one across all connections.

Upvotes: 5

Related Questions