Reputation: 65
I'm using rabbitMQ in order to send tasks to workers (consumers) which are created on the run. Currently, each time a new task is created, a new worker is created. The problem goes like that : -A user creates a task
-A worker is created then the task is sent on the queue for the workers to process
-The worker starts processing the queue (the worker basically sleeps for a time)
-Another user creates a task
-New worker is created and task sent on the queue
-The new worker doesn't process the new task and does absolutly nothing meanwhile, and the new task is processed by the first worker once he's done with the first task
I've checked on the admin part of rabbitmq and there are two consumers bound to the queue, but one of them seems to do all the work while the other just waits.
Here's the code for the worker: public class Worker extends Thread {
private final static String QUEUE_NAME = "Tasks";
private final static String QUEUE_COMPL = "Completed";
public static int id = 0;
private static final String EXCHANGE_NAME = "logs";
public int compteur;
String identifier;
public Worker() {
Worker.id++;
compteur = id;
}
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WORKER " + compteur + " : Message received :" + message);
String taskName = message.split(" ")[0];
String length = message.split(" ")[1];
try {
System.out.println("WORKER " + compteur + " : Commencing job :" + message);
doWork(length);
System.out.println("WORKER " + compteur + " : Job's finished :" + message);
taskName += " done by " + compteur;
// confirm(taskName);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println("WORKER " + compteur + " : Waiting for a new task...");
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (IOException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
} catch (TimeoutException ex) {
Logger.getLogger(Worker.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void doWork(String taskLength) throws InterruptedException {
int temps = Integer.parseInt(taskLength);
Thread.sleep(temps);
}
}
and the code for the part which puts the messages into the queue: public class serveurSD {
private final static String QUEUE_NAME = "Tasks";
private Manager MANAGER = new Manager();
@WebMethod(operationName = "processTask")
public String processTask(@WebParam(name = "message") String txt, @WebParam(name = "duree") int duree) throws IOException, TimeoutException {
if (MANAGER == null){
MANAGER= new Manager();
MANAGER.listen();
}
System.out.println("SERVER : Message received : " + txt + " " + duree);
MANAGER.giveTask();
ConnectionFactory factory = new ConnectionFactory();
String message = txt + " " + duree;
System.out.println("SERVER : Sending message to workers : " + message);
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.close();
connection.close();
return "Your task is being processed";
}
}
(Manager is the class creating the workers on the go.)
I'm sorry if a similar question has already been asked but I couldn't find it. Thanks for any possible help :)
Upvotes: 2
Views: 13899
Reputation: 72858
the second parameter of the basicConsume
method is "auto acknowledge". Having this parameter set to true
means the consumer will tell RabbitMQ that the message has been acknowledged, as soon as it receives the message.
When the consumer is set to autoAck true, it is highly likely that it will immediately receive the next available message from the queue, even when basicQos
is set to 1. this happens, because the 1 limit is immediately decremented by the consumer, to say it no longer has any message and it can accept the next one.
Changing the auto ack parameter to false
prevents this problem, when combined with the basic QoS setting of 1, because it forces your consumer to say "hey, i've got a message and i'm currently working on it. don't send me anything else until i'm done."
this allows the other consumer to say "hey, i have a spot open. go ahead and send me the message"
Upvotes: 6
Reputation: 65
Okay, I seem to have found a way to make it work, problem is I don't know why it works, so if anyone knows, I'd be glad to have an explanation
I changed the second argument of channel.basicConsume(QUEUE_NAME, true, consumer);
to false, but I'm not sure to have understood why does it fix my problem.
Upvotes: 0