Reputation: 151
I have upgraded from mule 3.2.1 to 3.3.1 and I am facing following problem. I am using the custom Aggregator and custom transformer for aggregating multiple files for processing. The problem is that after first time aggregation when I am dropping another set of files, only transformer class is called and no other component is getting execute. There is no error on console either. When I restart the mule, again the process is executing for the first time only.
My transformer class is :
public class FileCorrelationTransformer extends AbstractMessageTransformer {
@Override
public Object transformMessage(MuleMessage message, String arg1)
throws TransformerException {
System.out.println("FileCorelationTransformer called");
System.out.println("=============================");
System.out.println("Extract Name = "
+ message.getOutboundProperty("originalFilename"));
final String fileName = message.getOutboundProperty("filename");
final String extractName = Generate.extractName(fileName);
final ArrayList<ExtractVO> extractVOs = ExtractDAO.getInstance()
.load(extractName);
System.out.println("Group Size: " + extractVOs.get(0).getNoOfParts());
System.out.println("=============================");
message.setCorrelationId(extractName);
message.setCorrelationGroupSize(extractVOs.get(0).getNoOfParts());
return message;
}
}
My aggregator class is :
public class FileCollectionAggregator extends AbstractAggregator {
@Override
protected EventCorrelatorCallback getCorrelatorCallback(
MuleContext muleContext) {
return new CollectionCorrelatorCallback(muleContext, false, "") {
@Override
public MuleEvent aggregateEvents(EventGroup events)
throws AggregationException {
CopyOnWriteArrayList<File> list = new CopyOnWriteArrayList<File>();
try {
for (Iterator<MuleEvent> iterator = events.iterator(); iterator
.hasNext();) {
MuleEvent event = iterator.next();
try {
list.add(new File(event.transformMessageToString()));
} catch (TransformerException e) {
throw new AggregationException(events, null, e);
}
}
} catch (ObjectStoreException e) {
throw new AggregationException(events, null, e);
}
return new DefaultMuleEvent(new DefaultMuleMessage(list,
muleContext), events.getMessageCollectionEvent());
}
};
}
}
My mule flow is :
<vm:connector name="ConnectorSinglePartMonthly"
dynamicNotification="true" doc:name="VM" />
<custom-transformer
class="com.transformers.FileCorrelationTransformer" name="fileMultiPartCorrelationTransformer"
doc:name="Java" />
<flow name="CountTP" doc:name="CountTP">
<file:inbound-endpoint path="c:/test/input"
connector-ref="fileConnector" doc:name="Count CSV">
<file:filename-regex-filter pattern="UGJERT[0-9]*.csv"
caseSensitive="false" />
</file:inbound-endpoint>
<vm:outbound-endpoint connector-ref="ConnectorMultiPartMonthly"
path="monthlyMultiPartFiles" transformer-refs="fileMultiPartCorrelationTransformer"
doc:name="TP Multi VM Out" />
<logger message="Done with the Flow!" doc:name="Logger" />
</flow>
<flow name="MultiPartMonthlyFlow" doc:name="MultiPartMonthlyFlow">
<vm:inbound-endpoint connector-ref="ConnectorMultiPartMonthly"
path="monthlyMultiPartFiles" doc:name="TP Multi Monthly VM In" />
<custom-aggregator class="com.routing.FileCollectionAggregator"
doc:name="Custom Aggregator" />
<component doc:name="Csv Reader">
<prototype-object
class="com.utility.CsvReader" />
</component>
</flow>
My console Output is :
After First Execution
INFO 2013-07-19 16:51:02,296 [[test].fileConnector.receiver.05] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT12.csv
INFO 2013-07-19 16:51:02,311 [[test].fileConnector.receiver.05] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT22.csv
FileCorelationTransformer called
FileCorelationTransformer called
=============================
=============================
Extract Name = UGJERT12.csv
Extract Name = UGJERT22.csv
Group Size: 2
=============================
Group Size: 2
=============================
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'ConnectorMultiPartMonthly.dispatcher.6508195'. Object is: VMMessageDispatcher
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.02] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'ConnectorMultiPartMonthly.dispatcher.29350820'. Object is: VMMessageDispatcher
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'ConnectorMultiPartMonthly.dispatcher.6508195'. Object is: VMMessageDispatcher
INFO 2013-07-19 16:51:02,421 [[test].ConnectorMultiPartMonthly.dispatcher.02] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'ConnectorMultiPartMonthly.dispatcher.29350820'. Object is: VMMessageDispatcher
Process execution starts and extractName is = UGJERT
After Second Execution
INFO 2013-07-19 16:53:18,530 [[test].fileConnector.receiver.09] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT12.csv
INFO 2013-07-19 16:53:18,546 [[test].fileConnector.receiver.09] org.mule.transport.file.FileMessageReceiver: Lock obtained on file: C:\test\input\UGJERT22.csv
FileCorelationTransformer called
=============================
Extract Name = UGJERT12.csv
Group Size: 2
=============================
FileCorelationTransformer called
=============================
Extract Name = UGJERT22.csv
Group Size: 2
=============================
For every set of files, the correlation id is the name of the file which is unique across different set of files. Cannot anyone tell me what Is wrong with the code.
Upvotes: 1
Views: 570
Reputation: 2319
You can't reuse a correlation id because that's kept on the object store (that explains why it works again if you restart Mule).
You have to use a different correlation id for each of the aggregation groups.
Upvotes: 1