Reputation: 505
I am using ZMQ in my Java application. I found that it behaves unevenly i.e if I send some 100 message with One consumer say it takes 1 sec then if we go on increasing the consumers the time taken becomes 2,1.5,3 such that. There is no gradual increase or decrease. How can I correct this. Look for my code below
// Broker
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMQStreamer;
public class Broker {
/**
* @param args
*/
public static void main(String[] args)
{
Context context = ZMQ.context(1);
Socket frontEnd = context.socket(ZMQ.PULL);
frontEnd.bind("tcp://*:5555");
Socket backEnd = context.socket(ZMQ.PUSH);
backEnd.bind("tcp://*:5560");
ZMQStreamer zmqStreamer = new ZMQStreamer(context, frontEnd, backEnd);
zmqStreamer.run();
}
}
// Producer
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
public class Producer
{
public void init()
{
ZMQ.Context context = ZMQ.context(1);
socket = context.socket(ZMQ.PUSH);
socket.connect("tcp://localhost:5555");
}
public void initMessage(String message)
{
this.message = message;
}
public void sendMessage()
{
String sendMessage = System.nanoTime() +"#"+ message;
socket.send(sendMessage.getBytes(), 0);
}
/**
* @param args
*/
public static void main(String[] args)
{
Producer producer = new Producer();
producer.init();
byte[] message = new byte[Integer.parseInt(args[0])];
//message = "Hello".getBytes();
producer.initMessage(new String(message));
for(int i=0;i<100;i++)
{
producer.sendMessage();
}
}
private Socket socket = null;
private String message;
}
//Consumer
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
public class Consumer
{
public void init()
{
ZMQ.Context context = ZMQ.context(1);
socket = context.socket(ZMQ.PULL);
socket.connect("tcp://localhost:5560");
}
public void reciveMessage()
{
byte[] recived = socket.recv(0);
//System.out.println(recived.length);
long recivedTime = System.nanoTime();
String message = new String(recived);
String[] splitMessage = message.split("#");
long sendTime = Long.parseLong(splitMessage[0]);
System.out.println("Send Time " + sendTime + " RecivedTime "
+ recivedTime + " Time taken " + (recivedTime - sendTime)
+ " Message " + message);
}
/**
* @param args
*/
public static void main(String[] args)
{
Consumer consumer = new Consumer();
consumer.init();
for (int i=0;i<100;i++)
{
consumer.reciveMessage();
}
}
private Socket socket = null;
}
Upvotes: 0
Views: 284
Reputation: 10951
To reliably time a piece of multithreaded code, you're going to need to have some way of syncing the starting and ending times at the collector/sink (which you currently do not have shown to be programmed).
Check out this example from the ZMQ guide, which states the following process as one of the correct ZMQ ways of dividing a dataset:
...
Our supercomputing application is a fairly typical parallel processing model:
We have a ventilator that produces tasks that can be done in parallel. We have a set of workers that process tasks. We have a sink that collects results back from the worker processes.
...
Upvotes: 1