Mohamed Aoutir
Mohamed Aoutir

Reputation: 705

Bind RabbitMQ consumer using Spring Cloud Stream to RabbitMQ producer

I have two microservices, one for collecting XML files from internal FTP server ,transforming it to DTO objects and then publishing them as bytes in RabbitMQ and the other for deserializing the incoming bytes from RabbitMQ to DTO objects, mapping them to JPA entities and persisiting them to database.

I'd like configure RabbitMQ broker between these two microservices like below:

1) for microservice that collect XML files, I edited in application.properties as below:

spring.cloud.stream.bindings.output.destination=TOPIC
spring.cloud.stream.bindings.output.group=proactive-policy

2) for microservice that persist incoming DTO onjects, I configured in application.properties as following:

spring.cloud.stream.bindings.input.destination=TOPIC
spring.cloud.stream.bindings.input.group=proactive-policy 

For receiving incoming bytes from RabbitMQ I'm using second microservice as sink:

@EnableJpaAuditing
@EnableBinding(Sink.class)
@SpringBootApplication(scanBasePackages = { "org.proactive.policy.data.cache" })
@RefreshScope
public class ProactivePolicyDataCacheApplication {
    private static Logger logger = LoggerFactory.getLogger(ProactivePolicyDataCacheApplication.class);

    @Autowired
    PolicyService policyService;

    public static void main(String[] args) {
        SpringApplication.run(ProactivePolicyDataCacheApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void input(Message<byte[]> message) throws Exception {
        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
            logger.error("the message is null ");
            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
        }
        byte[] data = message.getPayload();
        if (data.length == 0) {
            logger.warn("Received empty message");
            return;
        }
        logger.info("Got data from policy-collector = " + new String(data, "UTF-8"));
        PolicyListDto policyListDto = (PolicyListDto) SerializationUtils.deserialize(data);
        logger.info("Policies.xml from policy-collector = " + policyListDto.getPolicy().toString());

        policyService.save(policyListDto);
    }

}

But when I open RabbitMQ console for looking at exchanges I didn't receive any thing in Queue TOPIC.proactive-policy But the incoming messages are received in another Queue that I haven't configured it named FTPSTREAM.proactive-policy-collector

Is there any suggestion for resolving this issue

Upvotes: 0

Views: 314

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6106

Couple of points: 1. There is no such thing as 'group' for the output binding. Consumer Group is a consumer property. Here is the fragment of the javadocs.

/**
 * Unique name that the binding belongs to (applies to consumers only). Multiple
 * consumers within the same group share the subscription. A null or empty String
 * value indicates an anonymous group that is not shared.
 * @see org.springframework.cloud.stream.binder.Binder#bindConsumer(java.lang.String,
 * java.lang.String, java.lang.Object,
 * org.springframework.cloud.stream.binder.ConsumerProperties)
 */
private String group;

2. The name 'FTPSTREAM.proactive-policy-collector' is definitely not something that is generated by the spring-cloud-stream, so consider looking into your configuration and see what have you missed.

It tells me that you have some consumer that has its 'destination' named FTPSTREAM and its 'group' proactive-policy-collector. It also tells me that your producer sends messages to the FTPSTREAM exchange.

Upvotes: 2

Related Questions