Reputation: 19
Here is my Spring Integration snippet:
@Bean
public IntegrationFlow mqttInFlowTopicConnect(JsonToObjectTransformer jsonToAlarmTransformer){
return IntegrationFlow.from("alarmChannel")
.transform(jsonToAlarmTransformer)
.route("payload.alarmType", m -> m
.channelMapping("NewAlarm","newAlarmChannel")
.channelMapping("ChangedAlarm","changedAlarmChannel")
.channelMapping("ClearedAlarm","clearedAlarmChannel")
)
.get();
}
@Bean
public IntegrationFlow newAlarmFlow(){
return IntegrationFlow.from("newAlarmChannel")
.handleReactive(message -> Mono.just(message)
.flatMap(msg -> alarmRepo.save((Alarm)msg.getPayload()))
.then(),
e -> e.requiresReply(false)
);
}
but when It received a newAlarm, it got errors: "no output-channel or replyChannel header available". I read in "https://docs.spring.io/spring-integration/reference/reactive-streams.html", it said that: "Starting with version 6.1, the IntegrationFlowDefinition exposes a convenient handleReactive(ReactiveMessageHandler) terminal operator. Any ReactiveMessageHandler implementation (even just a plain lambda using the Mono API) can be used for this operator. The framework subscribes to the returned Mono automatically...", I don't know where's my error. please help me!
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@788185c7] (handleNewAlarm.insertNewAlarm.serviceActivator)]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:160) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:238) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:220) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:334) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.13.jar:6.1.13]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:536) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:359) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:288) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:252) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:151) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:129) ~[spring-integration-core-6.3.4.jar:6.3.4]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56) ~[spring-integration-core-6.3.4.jar:6.3.4]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:531) ~[spring-integration-core-6.3.4.jar:6.3.4] at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:359) ~[spring-integration-core-6.3.4.jar:6.3.4] at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:288) ~[spring-integration-core-6.3.4.jar:6.3.4] at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:252) ~[spring-integration-core-6.3.4.jar:6.3.4] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:151) ~[spring-integration-core-6.3.4.jar:6.3.4] at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.3.4.jar:6.3.4] ... 38 common frames omitted
Upvotes: 1
Views: 23