Reputation: 1183
I have created a Java socket server which creates a socket server on a specified port and then spawns a RecordWriter object to perform some operation on the data stream obtained from each connection.
I start the program with port as 61000 and numthreads as 2. I also started 3 clients to connect to it. On the client side I could see that all 3 of them connected to the receiver however, the receiver logs indicated only two of them connected.
netstat -an|grep 61000|grep -i ESTABLISHED
indicated total 6 connections as the client and server are being run on the same machine. My doubts are:
Executors.newFixedThreadPool(numThreads);
is allowing only 2 clients to be connected. Here are my codes:
MyReceiver.java
package com.vikas;
import java.net.ServerSocket;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MyReceiver{
protected int serverPort = -1;
protected int numThreads = -1;
protected boolean isStopped = false;
protected Thread runningThread = null;
protected ExecutorService threadPool = null;
protected static Logger logger = LogManager.getLogger(MyReceiver.class);
protected static ServerSocket serverSocket = null;
protected static Map<String, String> mapConnections = new ConcurrentHashMap<String, String>();
public MyReceiver(int port){
this.serverPort = port;
}
public void run(int numThreads){
this.threadPool = Executors.newFixedThreadPool(numThreads);
try {
logger.info("Starting server on port " + this.serverPort);
MyReceiver.serverSocket = new ServerSocket(this.serverPort, numThreads);
} catch (IOException e) {
//throw new RuntimeException("Cannot open port " + this.serverPort, e);
logger.error("Cannot open port " + this.serverPort, e);
}
while(!isStopped()){
this.threadPool.execute(new MyWriter());
}
if(MyReceiver.mapConnections.isEmpty()){
this.threadPool.shutdown();
//System.out.println("Server Stopped after shutdown.") ;
logger.info("Server Stopped after shutdown.");
}
}
public synchronized boolean isStopped() {
return this.isStopped;
}
public synchronized void stop(){
this.isStopped = true;
try {
MyReceiver.serverSocket.close();
} catch (IOException e) {
//throw new RuntimeException("Error closing server", e);
logger.error("Error closing server", e);
}
}
public static void main(String[] args) {
if(args.length != 2){
System.out.println("Number of input arguements is not equal to 4.");
System.out.println("Usage: java -cp YOUR_CLASSPATH -Dlog4j.configurationFile=/path/to/log4j2.xml com.vikas.MyReceiver <port> <number of threads>");
System.out.println("java -cp \"$CLASSPATH:./MyReceiver.jar:./log4j-api-2.6.2.jar:./log4j-core-2.6.2.jar\" -Dlog4j.configurationFile=log4j2.xml com.vikas.MyReceiver 61000 2");
}
int port = Integer.parseInt(args[0].trim());
int numThreads = Integer.parseInt(args[1].trim());
final MyReceiver myConnection = new MyReceiver(port, topic, brokers);
myConnection.run(numThreads);
/*Thread t = new Thread(myConnection);
t.start();*/
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
//e.printStackTrace();
logger.error("Something went wrong", e);
}
//System.out.println("Stopping Server");
Runtime.getRuntime().addShutdownHook(new Thread()
{
@Override
public void run()
{
logger.info("SocketServer - Receive SIGINT!!!");
logger.info("Stopping Server");
if(!myConnection.isStopped()){
myConnection.stop();
}
logger.info("Server Stopped successfully");
try
{
Thread.sleep(1000);
}
catch (Exception e) {}
}
});
//myConnection.stop();
}
}
MyWriter.java
package com.vikas;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.io.IOException;
import java.net.Socket;
import java.util.Properties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MyWriter implements Runnable{
protected String topic = null;
protected String brokers = null;
protected static Logger logger = LogManager.getLogger(MyWriter.class);
public MyWriter () {
}
public void run() {
while(!MyReceiver.serverSocket.isClosed()){
Socket server = null;
try {
server = MyReceiver.serverSocket.accept();
//System.out.println("Just connected to " + server.getRemoteSocketAddress());
logger.info("Just connected to " + server.getRemoteSocketAddress());
MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");
//change for prod deployment //change implemented
String key = null;
String message = null;
char ch;
StringBuilder msg = new StringBuilder();
int value = 0;
try {
BufferedReader in = new BufferedReader(new InputStreamReader(server.getInputStream()));
while((value = in.read()) != -1){
ch = (char)value;
if(ch == 0x0a){
//msg.append(ch);
//System.out.println(msg);
message = msg.toString().trim();
//code change as part of testing in prod
if(message.length() != 0){
//do something
msg.setLength(0);
}
else{
logger.error("Blank String received");
msg.setLength(0);
}
}
else{
msg.append(ch);
}
}
logger.info("Closing connection for client :" + server.getRemoteSocketAddress());
//System.out.println("Closing connection for client :" + this.getClientSocket().getRemoteSocketAddress());
server.close();
MyReceiver.mapConnections.remove(server.getRemoteSocketAddress());
} catch (IOException e) {
//report exception somewhere.
//e.printStackTrace();
logger.error("Something went wrong!!", e);
}
finally{
producer.close();
}
} catch (IOException e) {
if(MyReceiver.serverSocket.isClosed()) {
//System.out.println("Server was found to be Stopped.");
logger.error("Server was found to be Stopped.");
logger.error("Error accepting client connection", e);
break;
}
}
}
}
}
Upvotes: 0
Views: 3114
Reputation: 8392
The backlog
parameter of the ServerSocket
constructor restricts the size of the incoming connection queue not the total number of times you are allowed to successfully call accept()
. If you want to restrict the number of active connections you need to keep track of how many connections you've accepted then when you hit your threshold don't call accept()
again until at least one of the active connections has been closed.
while(!MyReceiver.serverSocket.isClosed()){
Socket server = null;
try {
server = MyReceiver.serverSocket.accept();
//System.out.println("Just connected to " + server.getRemoteSocketAddress());
logger.info("Just connected to " + server.getRemoteSocketAddress());
MyReceiver.mapConnections.put(server.getRemoteSocketAddress().toString().trim(), "");
if (activeConnections == maxConnections) break; // exit accept loop
Upvotes: 4