Hantsy
Hantsy

Reputation: 9321

Convert Spring Integration Cafe demo from xml config to Java 8 DSL

I try to convert the integration cafe demo to Java 8 DSL, there are some existing examples in the official Spring Integration Samples.

I want to combine all the good parts of these examples.

The codes(not work as expected, some issues) are here.

In the official XML samples, it is easy to set up a request/reply channel in Amqp gateway. But go to Java 8 DSL, the options are missing.

And when the application, it will complain "reply channel or output channel is required".

BTW, is there any better option to debug/unit testing the Spring integration application?

Update 1: confused when I was writing the flow of coldDrinks flow.

The XML is from the original cafe-amqp project.

<!-- To receive an AMQP Message from a Queue, and respond to its reply-to address, configure an inbound-gateway. -->
    <int-amqp:inbound-gateway
        id="coldDrinksBarista"
        request-channel="coldJsonDrinks"
        queue-names="cold-drinks"
        connection-factory="rabbitConnectionFactory" />

    <int:chain input-channel="coldJsonDrinks">
        <int:json-to-object-transformer type="org.springframework.integration.samples.cafe.OrderItem"/>
        <int:service-activator method="prepareColdDrink">
            <bean class="org.springframework.integration.samples.cafe.xml.Barista"/>
        </int:service-activator>
        <int:object-to-json-transformer content-type="text/x-json"/>
    </int:chain>

How to convert this into Java DSL effectively. I added my thoughts inline

 @Bean
    public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows
                .from("coldDrinks")
                .handle(
                        Amqp.outboundGateway(amqpTemplate)
                                .exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
                                .routingKey(ROUTING_KEY_COLD_DRINKS)
                )
                .log("coldDrinksFlow")
                .channel(preparedDrinksChannel())
                .get();
    }

    @Bean
    public IntegrationFlow coldDrinksBaristaFlow(ConnectionFactory connectionFactory, Barista barista) {
        return IntegrationFlows
                .from(Amqp.inboundGateway(connectionFactory, QUEUE_COLD_DRINKS)
                        .configureContainer(
                                c -> c.receiveTimeout(10000)
                        )// If setup replyChannel the below `handle` is not worked as expected.


                )
                .handle(OrderItem.class, (payload, headers) -> (Drink) barista.prepareColdDrink(payload))
//If adding a channel here, the flow will NOT return back the `coldDrinksFlow` will cause another exception, "its requiresReply is set to true..."
                .get();
    }

In my before experience, I would like to break the whole flow by the protocols(HTTP, FTP, etc) as edges(inbound at the beginning and outbound as the end) in small flows. The inbound/outbound gateway is easy to set it to work without setting a reply channel etc, it should reply by default through the original route using its built-in protocols instead of channels. In my inboundGateway RSocket example, I don't set a reply channel there, but the message returns to the roscket routes and receive by the client side(outboudGateway).

Update: Finally it works, check here. An issue I encountered here when trying to use Amqp to send and receive object message, the are some class cast exception thrown, the TypeId in headers is NOT changed when using handle etc MessageHandler, have to transform bwteen json/object like the xml based cafe-amqp did to make it finally work. What is missing here?

Upvotes: 1

Views: 464

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

There is already an official Java DSL sample in that repo: https://github.com/spring-projects/spring-integration-samples/tree/master/dsl/cafe-dsl.

Yes, there is no an AMQP variant, but if you follow an Amqp factory from spring-integration-amqp, that should not be too hard to convert that XML config to Java DSL.

Not sure what is that "bad option" for testing you use, but in the spring-integration-test we provide enough utils to simplify flows testing: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/testing.html#testing

UPDATE

Your problem is here:

<int-amqp:outbound-gateway
    id="coldDrinksBarista"
    request-channel="coldDrinks"
    reply-channel="preparedJsonDrinks"
    exchange-name="cafe-drinks"
    routing-key="drink.cold"
    amqp-template="amqpTemplate" />

where your config:

@Bean
public IntegrationFlow coldDrinksFlow(AmqpTemplate amqpTemplate) {
    return IntegrationFlows
            .from("coldDrinks")
            .handle(
                    Amqp.outboundGateway(amqpTemplate)
                            .exchangeName(TOPIC_EXCHANGE_CAFE_DRINKS)
                            .routingKey(ROUTING_KEY_COLD_DRINKS)
            )
            .get();
}

doesn't have a similar replyChannel part. That why you get that reply channel or output channel is required because you send from the route which does not expect any replies. Probably you just haven't finished to design your application for the whole logic...

Upvotes: 0

Related Questions