Reputation: 129
I wish create a custom message handler to use checkpoints in flows. Besides, those checkpoints will be stored in ElasticSearch.
I created a class Checkpoint:
@Component
public class Checkpoint {
public static final String TASK_HEADER_KEY = "task";
public static CheckpointMessageHandlerSpec warn(String message) {
return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
}
}
// ... methods omitted: error, info etc
Next I created CheckpointMessageHandlerSpec:
public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {
public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
this.target = checkpointHandler;
}
public CheckpointMessageHandlerSpec apply(Message<?> message) {
this.target.handleMessage(message);
return _this();
}
@Override
protected CheckpointHandler doGet() {
throw new UnsupportedOperationException();
}
}
CheckpointHandler, in this class I wish to inject things, like services or repositories from Spring Data:
public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {
private String status;
private String message;
// I want inject services or repositories here
public CheckpointHandler(String status, String message) {
this.status = status;
this.message = message;
}
@Override
public void handleMessage(Message<?> message) {
// Test to watch if I have the bean factory. It is always null
this.getBeanFactory();
Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");
// Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected
Object obj = expression.getValue(message);
}
}
Finally, a example of use, inside a flow:
@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
.enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
.handle(new AppendMessageHandler())
.wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
.handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
.get();
}
private class AppendMessageHandler implements GenericHandler {
@Override
public String handle(Object payload, Map headers) {
return new StringBuilder().append(testMessage).toString();
}
}
What I miss? Is it possible to do that? I had this idea following this question How to create custom component and add it to flow in spring java dsl?
Thanks!
Upvotes: 2
Views: 1719
Reputation: 121542
Bean can be autowired, if they are, well, exactly beans.
Let take a look into your code one more time!
c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))
The real bean here is exactly Lambda :). Sad, of course, but not your custom factory with subsequent apply()
. Your custom code is invoked exactly in the target Lambda for each incoming message, but without aware about BeanFactory
.
To fix your problem you should use your factory as is:
.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '")))
And Framework will take care about registration your CheckpointHandler
as a bean and, therefore, autowiring.
AS you may guess already you don't need that apply()
method. Just because there is need to distinguish assemble phase when Java DSL populates a tree for beans. The initialization and registration phase, when that tree is parsed by the Framework and beans are registered in the application context. And, finally, there is a runtime phase, when messages are travel from channel to channel though all those message handler, transformers etc.
Upvotes: 2