Higor Oliveira
Higor Oliveira

Reputation: 129

How to autowire beans inside a Spring Integration custom message handler?

I wish create a custom message handler to use checkpoints in flows. Besides, those checkpoints will be stored in ElasticSearch.

I created a class Checkpoint:

@Component
public class Checkpoint {

    public static final String TASK_HEADER_KEY = "task";

    public static CheckpointMessageHandlerSpec warn(String message) {
        return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
    }
}
// ... methods omitted: error, info etc

Next I created CheckpointMessageHandlerSpec:

public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {

    public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
        this.target = checkpointHandler;
    }

    public CheckpointMessageHandlerSpec apply(Message<?> message) {
        this.target.handleMessage(message);
        return _this();
    }

    @Override
    protected CheckpointHandler doGet() {
        throw new UnsupportedOperationException();
    }
}

CheckpointHandler, in this class I wish to inject things, like services or repositories from Spring Data:

public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {

    private String status;
    private String message;

    // I want inject services or repositories here

    public CheckpointHandler(String status, String message) {
        this.status = status;
        this.message = message;
    }

    @Override
    public void handleMessage(Message<?> message) {
        // Test to watch if I have the bean factory. It is always null
        this.getBeanFactory();

        Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");

        // Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected   
        Object obj = expression.getValue(message);
    }
}

Finally, a example of use, inside a flow:

@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
    return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
            .enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
            .handle(new AppendMessageHandler())
            .wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
            .handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
            .get();
}

private class AppendMessageHandler implements GenericHandler {

    @Override
    public String handle(Object payload, Map headers) {
        return new StringBuilder().append(testMessage).toString();
    }
}

What I miss? Is it possible to do that? I had this idea following this question How to create custom component and add it to flow in spring java dsl?

Thanks!

Upvotes: 2

Views: 1719

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

Bean can be autowired, if they are, well, exactly beans.

Let take a look into your code one more time!

c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m))

The real bean here is exactly Lambda :). Sad, of course, but not your custom factory with subsequent apply(). Your custom code is invoked exactly in the target Lambda for each incoming message, but without aware about BeanFactory.

To fix your problem you should use your factory as is:

.wireTap(c -> c.handle(checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '")))

And Framework will take care about registration your CheckpointHandler as a bean and, therefore, autowiring.

AS you may guess already you don't need that apply() method. Just because there is need to distinguish assemble phase when Java DSL populates a tree for beans. The initialization and registration phase, when that tree is parsed by the Framework and beans are registered in the application context. And, finally, there is a runtime phase, when messages are travel from channel to channel though all those message handler, transformers etc.

Upvotes: 2

Related Questions