KisnardOnline
KisnardOnline

Reputation: 740

Synchronize on DataOutputStream

I have gone through so many tutorials on Synchronization now that my head is spinning. I have never truly understood it :(.

I have a Java server(MainServer), that when a client connects creates a new thread(ServerThread) with a DataOutputStream.

The client talks to the ServerThread and the ServerThread responds. Every now and then the MainServer will distribute a message to all clients utilizing each ServerThread's DataOutputStream object.

I am quite certain that every now and then my issue is because both the MainServer and ServerThread are trying to send something to the client at the same time. Therefore I need to lock on the DataOutputStream object. For the life of me I cannot understand this concept any further. Every example I read is confusing.

What is the correct way to handle this?

ServerThread's send to client method:

public void replyToOne(String reply){
    try {
        commandOut.writeUTF(reply);
        commandOut.flush();
    } catch (IOException e) {
        logger.fatal("replyToOne", e);
    }
    logger.info(reply);
}

MainServer's distribute to all clients method:

public static void distribute(String broadcastMessage){
    for (Map.Entry<String, Object[]> entry : AccountInfoList.entrySet()) {
        Object[] tmpObjArray = entry.getValue();
        DataOutputStream temporaryCOut = (DataOutputStream) tmpObjArray[INT_COMMAND_OUT]; //can be grabbed while thread is using it
        try {
            temporaryCOut.writeUTF(broadcastMessage);
            temporaryCOut.flush();
        } catch (IOException e) {
            logger.error("distribute: writeUTF", e);
        }
        logger.info(broadcastMessage);  
    }
}

I am thinking I should have something like this in my ServerThread class.

public synchronized DataOutputStream getCommandOut(){
    return commandOut;
}

Is it really that simple? I know this has likely been asked and answered, but I don't seem to be getting it still, without individual help.

Upvotes: 1

Views: 1607

Answers (3)

Muli Yulzary
Muli Yulzary

Reputation: 2569

Just putting my 2 cents:

The way I implement servers is this:

Each server is a thread with one task only: listening for connections. Once it recognizes a connection it generates a new thread to handle the connection's input/output (I call this sub-class ClientHandler).

The server also keeps a list of all connected clients.

ClientHandlers are responsible for user-server interactions. From here, things are pretty simple:

Disclaimer: there are no try-catches blocks here! add them yourself. Of course you can use thread executers to limit the number of concurrent connections.

Server's run() method:

@Override
public void run(){
 isRunning = true;
 while(isRunning){
  ClientHandler ch = new ClientHandler(serversocket.accept());
  clients.add(ch);
  ch.start();
 }
}

ClientHandler's ctor:

public ClientHandler(Socket client){
 out = new ObjectOutputStream(client.getOutputStream());
 in = new ObjectInputStream(client.getInputStream());
}

ClientHandler's run() method:

@Override
public void run(){
 isConnected = true;
 while(isConnected){
  handle(in.readObject());
 }
}

and handle() method:

private void handle(Object o){
 //Your implementation
}

If you want a unified channel say for output then you'll have to synchronize it as instructed to avoid unexpected results.

There are 2 simple ways to do this:

  1. Wrap every call to output in synchronized(this) block
  2. Use a getter for output (like you did) with synchronized keyword.

Upvotes: 1

Jean Logeart
Jean Logeart

Reputation: 53839

You are on the right track.

Every statement modifying the DataOutputStream should be synchronized on this DataOutputStream so that it is not concurrently accessed (and thus do not have any concurrent modification):

public void replyToOne(String reply){
    try {
        synchronized(commandOut) {    // writing block
            commandOut.writeUTF(reply);
            commandOut.flush();
        }
    } catch (IOException e) {
        logger.fatal("replyToOne", e);
    }
    logger.info(reply);
}

And:

public static void distribute(String broadcastMessage){
    for (Map.Entry<String, Object[]> entry : AccountInfoList.entrySet()) {
        Object[] tmpObjArray = entry.getValue();
        DataOutputStream temporaryCOut = (DataOutputStream) tmpObjArray[INT_COMMAND_OUT]; //can be grabbed while thread is using it
        try {
            synchronized(temporaryCOut) {  // writing block
                temporaryCOut.writeUTF(broadcastMessage);
                temporaryCOut.flush();
            }
        } catch (IOException e) {
            logger.error("distribute: writeUTF", e);
        }
    logger.info(broadcastMessage);  
    }
}

Upvotes: 1

rolfl
rolfl

Reputation: 17707

If this were me.....

I would have a LinkedBlockingQueue on each client-side thread. Then, each time the client thread has a moment of idleness on the socket, it checks the queue. If there's a message to send from the queue, it sends it.

Then, the server, if it needs to, can just add items to that queue, and, when the connection has some space, it will be sent.

Add the queue, have a method on the ServerThread something like:

addBroadcastMessage(MyData data) {
    broadcastQueue.add(data);
}

and then, on the socket side, have a loop that has a timeout-block on it, so that it breaks out of the socket if it is idle, and then just:

while (!broadcastQueue.isEmpty()) {
    MyData data = broadcastQueue.poll();
    .... send the data....
}

and you're done.

The LinkedBlockingQueue will manage the locking and synchronization for you.

Upvotes: 1

Related Questions