Reputation: 13845
I have a camel route which consume task from apache activeMQ. Everything works fine when there is only one consumer for ActiveMQ.
But when the consumers gets increased to two (or more), the application behave in an inappropriate manner.
Here is my route:
<routeContext id="myRoute" xmlns="http://camel.apache.org/schema/spring">
<route errorHandlerRef="myErrorHandler" id="myErrorRoute">
<from uri="activemq:queue:{{my.queue}}" />
<log loggingLevel="DEBUG" message="Message received from my queue : ${body}"></log>
<multicast>
<pipeline>
<log loggingLevel="DEBUG" message="Adding to redis : ${body}"></log>
<to uri="spring-redis://localhost:6379?serializer=#stringSerializer" />
</pipeline>
<pipeline>
<transform>
<method ref="insertBean" method="myBatchInsertion"></method>
</transform>
<choice>
<when>
<simple> ${body.size()} == ${properties:my.batch.size}</simple>
<log message="Going to insert my batch in database" />
<to uri="mybatis:batchInsert?statementType=InsertList"></to>
<log message="Inserted in my table : ${in.header.CamelMyBatisResult}"></log>
<choice>
<when>
<simple>${properties:my.write.file} == true</simple>
<bean beanType="com.***.***.processors.InsertToFile"
method="processMy(${exchange}" />
<log message="Going to write to file : ${in.header.CamelFileName}" />
<to uri="file://?fileExist=Append&bufferSize=32768"></to>
</when>
</choice>
</when>
</choice>
</pipeline>
</multicast>
</route>
</routeContext>
Below are the beans:
public class InsertBeanImpl {
public List<Out> myOutList = new CopyOnWriteArrayList<Out>();
public List<Out> myBatchInsertion(Exchange exchange) {
if (myOutList.size() >= myBatchSize) {
Logger.sysLog(LogValues.info,this.getClass().getName(),"Reached max PayLoad size : "+myOutList.size() + " , going to clear batch");
myOutList.clear();
}
Out myOut = exchange.getIn().getBody(Out.class);
Logger.sysLog(LogValues.APP_INFO, this.getClass().getName(), myOut.getMasterId()+" | "+"Adding to batch masterId : "+myOut.getMasterId());
synchronized(myOut){
myOutList.add(myOut);
}
Logger.sysLog(LogValues.info, this.getClass().getName(), "Count of batch : "+myOutList.size());
return myOutList;
}
}
public class SaveToFile {
static String currentFileName = null;
static int iSub = 0;
String path;
String absolutePath;
@Autowired
private Utility utility;
public void processMy(Exchange exchange) {
getFileName(exchange, currentFileNameSub, iSub);
}
public void getFileName(Exchange exchange, String outFile, int i) {
exchange.getIn().setBody(getFromJson(exchange));
path = (String) exchange.getIn().getHeader("path");
Calendar date = null;
date = new GregorianCalendar();
NumberFormat format = NumberFormat.getIntegerInstance();
format.setMinimumIntegerDigits(2);
String pathSuffix = "/" + date.get(Calendar.YEAR) + "/"
+ format.format((date.get(Calendar.MONTH) + 1)) + "/"
+ format.format(date.get(Calendar.DAY_OF_MONTH));
String fileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
double megabytes = 100 * 1024 * 1024;
if (outFile != null) {
if (!fileName.equals(outFile.split("_")[0])) {
outFile = null;
i = 0;
}
}
if (outFile == null) {
outFile = fileName + "_" + i;
}
while (new File(path + "/" + pathSuffix + "/" + outFile).length() >= megabytes) {
outFile = fileName + "_" + (++i);
}
absolutePath = path + "/" + pathSuffix + "/" + outFile;
exchange.getIn().setHeader("CamelFileName", absolutePath);
}
public String getFromJson(Exchange exchange) {
synchronized(exchange){
List<Out> body = exchange.getIn().getBody(CopyOnWriteArrayList.class);
Logger.sysLog(LogValues.info, this.getClass().getName(), "body > "+body.size());
String text = "";
for (int i = 0; i < body.size(); i++) {
Out msg = body.get(i);
text = text.concat(utility.convertObjectToJsonStr(msg) + "\n");
}
return text;
}
}
}
As the processors are not synchronized and not thread safe, the route does not works as expected in case of multiple consumers.
Can anybody tell how to make my route thread safe or synchronized?
I tried making the processors synchronized, but it didn't helped. Any other way to do this?
Upvotes: 2
Views: 4937
Reputation: 1756
Obviously both InsertBeanImpl
and SaveToFile
are not thread-safe. In general, beans used in a Camel route should be stateless, i.e. they shouldn't have variable fields.
For InsertBeanImpl
, it looks like what you really want to do is aggregating multiple messages into one. For such an use case I would consider using Camel aggregator [1] instead, with which you could implement a thread-safe solution more easily.
[1] http://camel.apache.org/aggregator.html
For SaveToFile
, I can see no reasons to make path
and absolutePath
as a field. Move them into local variables in getFileName
method.
Upvotes: 2
Reputation: 4120
Make your processors thread safe and do not use Synchronize too much or better not at all.
To do that you must not have changeable instance variables in them at all.
Only common properties can be there, like some settings valid for all threads and never changed during any methods execution. Then there is no need to use Synchronize mechanism which affects performance.
Everything else in Camel is thread safe and there is no way to tell Camel be thread safe if processor implementation is not.
Upvotes: 3