Reputation: 31
I am trying to implement a Spring Integration class that takes a .xml file parses it and if it's valid move it to an "archived" directory and in case of invalidity move it to an error directory.
import com.nagarro.studentapi.integration.queue.StudentSender;
import com.nagarro.studentapi.util.XmlParser;
import org.aopalliance.aop.Advice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.io.File;
@Configuration
@EnableIntegration
public class IntegrationConfiguration {
private static final String XML = "*.xml";
private static final String STUDENT = "\\student.xml";
@Value("${student-api.xmlPath}")
private String inputPath;
@Value("${student-api.archivedDestination}")
private String successPath;
@Value("${student-api.errorDestination}")
private String errorPath;
@Bean
public MessageChannel messageChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "messageChannel")
public MessageSource<File> messageProducer() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File(inputPath));
messageSource.setFilter(new SimplePatternFileListFilter(XML));
return messageSource;
}
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public MessageHandler handler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(successPath));
handler.setFileExistsMode(FileExistsMode.REPLACE);
handler.setExpectReply(false);
return handler;
}
@Bean
public IntegrationFlow integrationFlow(XmlParser xmlParser) {
return IntegrationFlows.from(messageProducer(), spec -> spec.poller(Pollers.fixedDelay(1000)))
.enrichHeaders(h -> h.headerExpression(FileHeaders.ORIGINAL_FILE, "payload"))
.convert(String.class)
.transform((String path) -> xmlParser.parsePath(path))
.handle("xmlParser", "parsePath", e -> e.advice(errorAdvice()))
.get();
}
@Bean
public AbstractRequestHandlerAdvice errorAdvice() {
return new AbstractRequestHandlerAdvice() {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
File file = message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class);
try {
Object result = callback.execute();
file.renameTo(new File(successPath, STUDENT));
System.out.println("File renamed after success");
return result;
}
catch (Exception e) {
file.renameTo(new File(errorPath, STUDENT));
System.out.println("File renamed after failure");
throw e;
}
}
};
}
}
However whenever calback.execute()
it's called I get this error and I don't quite understand why.
2022-09-06 18:20:07.971 ERROR 32152 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1135e3d6]; nested exception is java.lang.IllegalArgumentException: No candidate methods found for messages., failedMessage=GenericMessage [payload=Student(firstname=John, lastname=Dose, cnp=123, birthDate=2000-12-12, address=Address(street=a, number=1, city=Craiova, country=Romania), grades=[Grade(discipline=a, date=2021-12-12, grade=10), Grade(discipline=b, date=2021-12-12, grade=9)]), headers={....
Although I have a message handler I suspect the reason for this problem is that i do not override the handle method. But i am unsure of how to do it.
Upvotes: 1
Views: 336
Reputation: 121282
You have several problem:
@InboundChannelAdapter
and IntegrationFlows.from(messageProducer()
. This way you create two independent polling endpoints for the same source.@ServiceActivator
- the endpoint to write has just read file from one of the sources.@InboundChannelAdapter
, your @ServiceActivator
expectations and that flow..transform((String path) -> xmlParser.parsePath(path))
and then immediately after that handle("xmlParser", "parsePath")
which looks, essentially the same, but does not make sense since you are going to call the same parsePath()
twice, but for different payloads, where the second one is going to be as a result of the first parsePath()
call.Please, revise your logic carefully: right now some of your configuration is misleading and really error-prone. I believe that error you got is because your parsePath()
expects a String
, but not Student
as we see in the payload for that handle()
.
Upvotes: 1