jvas
jvas

Reputation: 440

TCP Server configuration in Mule - writing into client socket

I am trying to create a mule flow with a TCP inbound endpoint which is a TCP server that listens to a port. When a successful client connection is identified, before receiving any request from the client, I need to write a message into the socket (which lets the client know that I am listening), only after which the client sends me further requests. This is how I do it with a sample java program :

import java.net.*; 
import java.io.*; 

public class TCPServer 
{ 
 public static void main(String[] args) throws IOException 
   { 
    ServerSocket serverSocket = null; 

    try { 
         serverSocket = new ServerSocket(4445); 
        } 
    catch (IOException e) 
        { 
         System.err.println("Could not listen on port: 4445."); 
         System.exit(1); 
        } 

    Socket clientSocket = null; 
    System.out.println ("Waiting for connection.....");

    try { 
         clientSocket = serverSocket.accept(); 
        } 
    catch (IOException e) 
        { 
         System.err.println("Accept failed."); 
         System.exit(1); 
        } 

    System.out.println ("Connection successful");
    System.out.println ("Sending output message - .....");

    //Sending a message to the client to indicate that the server is active
    PrintStream pingStream = new PrintStream(clientSocket.getOutputStream());
    pingStream.print("Server listening");
    pingStream.flush();

    //Now start listening for messages 
    System.out.println ("Waiting for incoming message - .....");
    PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true); 
    BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); 

    String inputLine; 

    while ((inputLine = in.readLine()) != null) 
        { 
         System.out.println ("Server: " + inputLine); 
         out.println(inputLine); 

         if (inputLine.equals("Bye.")) 
             break; 
        } 

    out.close(); 
    in.close(); 
    clientSocket.close(); 
    serverSocket.close(); 
   } 
} 

I have tried to use Mule's TCP inbound endpoint as a server, but I am not able to see how I can identify a successful connection from the client, inorder to trigger the outbound message. The flow gets triggered only when a message is sent across from the client. Is there a way I can extend the functionality of the Mule TCP connector and have a listener which could do the above requirement?

Based on the answer provided, this is how I implemented this -

public class TCPMuleOut extends TcpMessageReceiver {

    boolean InitConnection = false;
    Socket clientSocket = null;

    public TCPMuleOut(Connector connector, FlowConstruct flowConstruct,
            InboundEndpoint endpoint) throws CreateException {
        super(connector, flowConstruct, endpoint);
    }

    protected Work createWork(Socket socket) throws IOException {
        return new MyTcpWorker(socket, this);
    }


    protected class MyTcpWorker extends TcpMessageReceiver.TcpWorker {

        public MyTcpWorker(Socket socket, AbstractMessageReceiver receiver)
                throws IOException {
            super(socket, receiver);
            // TODO Auto-generated constructor stub
        }

        @Override
        protected Object getNextMessage(Object resource) throws Exception {
            if (InitConnection == false) {

                clientSocket = this.socket;
                logger.debug("Sending logon message");
                PrintStream pingStream = new PrintStream(
                        clientSocket.getOutputStream());
                pingStream.print("Log on message");
                pingStream.flush();
                InitConnection = true;
            }

            long keepAliveTimeout = ((TcpConnector) connector)
                    .getKeepAliveTimeout();

            Object readMsg = null;
            try {
                // Create a monitor if expiry was set
                if (keepAliveTimeout > 0) {
                    ((TcpConnector) connector).getKeepAliveMonitor()
                            .addExpirable(keepAliveTimeout,
                                    TimeUnit.MILLISECONDS, this);
                }

                readMsg = protocol.read(dataIn);

                // There was some action so we can clear the monitor
                ((TcpConnector) connector).getKeepAliveMonitor()
                        .removeExpirable(this);

                if (dataIn.isStreaming()) {
                }

                return readMsg;
            } catch (SocketTimeoutException e) {
                ((TcpConnector) connector).getKeepAliveMonitor()
                        .removeExpirable(this);
                System.out.println("Socket timeout");
            } finally {
                if (readMsg == null) {
                    // Protocols can return a null object, which means we're
                    // done
                    // reading messages for now and can mark the stream for
                    // closing later.
                    // Also, exceptions can be thrown, in which case we're done
                    // reading.
                    dataIn.close();
                    InitConnection = false;
                    logger.debug("Client closed");
                }
            }
            return null;
        }
    }
} 

And the TCP connector is as below:

<tcp:connector name="TCP" doc:name="TCP connector"
    clientSoTimeout="100000" receiveBacklog="0" receiveBufferSize="0"
    sendBufferSize="0" serverSoTimeout="100000" socketSoLinger="0"
    validateConnections="true" keepAlive="true">
    <receiver-threading-profile
        maxThreadsActive="5" maxThreadsIdle="5" />
    <reconnect-forever />
    <service-overrides messageReceiver="TCPMuleOut" />
    <tcp:direct-protocol payloadOnly="true" />
</tcp:connector>

Upvotes: 1

Views: 1590

Answers (1)

MarcosNC
MarcosNC

Reputation: 356

What you're trying to do is a little difficult to accomplish but not impossible. The messages are received by the org.mule.transport.tcp.TcpMessageReceiver class, and this class always consumes the data in the input stream to create the message that injects in the flow. However, you could extend that receiver and instruct the TCP module to use yours by adding a service-overrides tag in your flow's tcp connector (documented here) and replacing the messageReceiver element. In your extended receiver you should change the TcpWorker.getNextMessage method in order to send the ack message before read from the input stream. HTH, Marcos.

Upvotes: 1

Related Questions