Reputation: 379
Thanks for reading ahead of time. In my main method I have a PublishSubscribeChannel
@Bean(name = "feeSchedule")
public SubscribableChannel getMessageChannel() {
return new PublishSubscribeChannel();
}
In a service that does a long running process it creates a fee schedule that I inject the channel into
@Service
public class FeeScheduleCompareServiceImpl implements FeeScheduleCompareService {
@Autowired
MessageChannel outChannel;
public List<FeeScheduleUpdate> compareFeeSchedules(String oldStudyId) {
List<FeeScheduleUpdate> sortedResultList = longMethod(oldStudyId);
outChannel.send(MessageBuilder.withPayload(sortedResultList).build());
return sortedResultList;
}
}
Now this is the part I'm struggling with. I want to use completable future and get the payload of the event in the future A in another spring bean. I need future A to return the payload from the message. I think want to create a ServiceActivator to be the message end point but like I said, I need it to return the payload for future A.
@org.springframework.stereotype.Service
public class SFCCCompareServiceImpl implements SFCCCompareService {
@Autowired
private SubscribableChannel outChannel;
@Override
public List<SFCCCompareDTO> compareSFCC(String state, int service){
ArrayList<SFCCCompareDTO> returnList = new ArrayList<SFCCCompareDTO>();
CompletableFuture<List<FeeScheduleUpdate>> fa = CompletableFuture.supplyAsync( () ->
{ //block A WHAT GOES HERE?!?!
outChannel.subscribe()
}
);
CompletableFuture<List<StateFeeCodeClassification>> fb = CompletableFuture.supplyAsync( () ->
{
return this.stateFeeCodeClassificationRepository.findAll();
}
);
CompletableFuture<List<SFCCCompareDTO>> fc = fa.thenCombine(fb,(a,b) ->{
//block C
//get in this block when both A & B are complete
Object theList = b.stream().forEach(new Consumer<StateFeeCodeClassification>() {
@Override
public void accept(StateFeeCodeClassification stateFeeCodeClassification) {
a.stream().forEach(new Consumer<FeeScheduleUpdate>() {
@Override
public void accept(FeeScheduleUpdate feeScheduleUpdate) {
returnList new SFCCCompareDTO();
}
});
}
}).collect(Collectors.toList());
return theList;
});
fc.join();
return returnList;
}
}
Was thinking there would be a service activator like:
@MessageEndpoint
public class UpdatesHandler implements MessageHandler{
@ServiceActivator(requiresReply = "true")
public List<FeeScheduleUpdate> getUpdates(Message m){
return (List<FeeScheduleUpdate>) m.getPayload();
}
}
Upvotes: 0
Views: 293
Reputation: 121552
Your question isn't clear, but I'll try to help you with some info.
Spring Integration doesn't provide CompletableFuture
support, but it does provide an async handling and replies.
See Asynchronous Gateway for more information. And also see Asynchronous Service Activator.
outChannel.subscribe()
should come with the MessageHandler
callback, by the way.
Upvotes: 0