Cornel
Cornel

Reputation: 79

RabbitMQ queue is empty even if message was published

I am trying to create a app using RabbitMQ and i am stuck at a point and I do not know what to do. I tried to follow a tutorial and everything should have worked, but it doesn't. Even if I am sending a postman request, even if I am doing the publishing from the RabbitMQ portal at localhost:15672, the pop up with message published appears, I get this text in the console, but when I want to check the queue, it is saying that is empty.

enter image description here

It is written very clear that the message has arrived to the consumer, but still, it the queue when I am checking from the portal, it doesn't

I will show the entire code now.

CONFIG

package ro.tuc.ds2020.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

CONSUMER

package ro.tuc.ds2020.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Received JSON message here -> %s", measurementDTO.toString()));
    }

}

CONTROLLER for when I am using postman

package ro.tuc.ds2020.controllers;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

@RequestMapping("/api/v1")
@RestController
@CrossOrigin(origins = "http://localhost:4200", allowCredentials = "true")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);
        return  ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

PUBLISHER

package ro.tuc.ds2020.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }

}

And here is the application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

Upvotes: 1

Views: 61

Answers (1)

Bench Vue
Bench Vue

Reputation: 9390

Java often presents more challenges in setting up a proper working environment compared to other languages like Node.js or Python, which are generally easier to configure.

Requirement

Maven 3.9.9 and JDK 17

> mvn --version
Apache Maven 3.9.9 (8e8579a9e76f7d015ee5ec7bfcdc97d260186937)
Maven home: C:\Users\benchvue\maven\apache-maven-3.9.9
Java version: 17.0.12, vendor: Amazon.com Inc., runtime: C:\Program Files\Amazon Corretto\jdk17.0.12_7
Default locale: en_US, platform encoding: Cp1252
OS name: "windows 11", version: "10.0", arch: "amd64", family: "windows"

enter image description here

File Tree

C:.
│   docker-compose.yml
│   pom.xml
│
├───.idea
│       .gitignore
│       compiler.xml
│       encodings.xml
│       jarRepositories.xml
│       misc.xml
│
└───src
    └───main
        ├───java
        │   └───ro
        │       └───tuc
        │           └───ds2020
        │               │   Ds2020Application.java
        │               │
        │               ├───config
        │               │       RabbitMQConfig.java
        │               │
        │               ├───consumer
        │               │       RabbitMQJsonConsumer.java
        │               │
        │               ├───controllers
        │               │       MessageJsonController.java
        │               │
        │               ├───dtos
        │               │       MeasurementDTO.java
        │               │
        │               └───publisher
        │                       RabbitMQJsonProducer.java
        │
        └───resources
            │   application.properties
            │
            └───static

enter image description here

RabbitMQConfig.java

package ro.tuc.ds2020.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_one}")
    private String routingKeyOne;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    @Bean
    public Queue queue() {
        return new Queue(queue);
    }

    @Bean
    public Queue jsonQueue() {
        return new Queue(jsonQueue, true);
    }

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with(routingKeyOne);
    }

    @Bean
    public Binding jsonBinding() {
        return BindingBuilder.bind(jsonQueue()).to(exchange()).with(routingKeyJson);
    }

    @Bean
    public MessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }
}

RabbitMQJsonConsumer.java

package ro.tuc.ds2020.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(MeasurementDTO measurementDTO) {
        try {
            String jsonMessage = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(measurementDTO);
            LOGGER.info("Received JSON message here -> \n{}", jsonMessage);
        } catch (JsonProcessingException e) {
            LOGGER.error("Failed to convert message to JSON", e);
        }
    }
}

MessageJsonController/java

package ro.tuc.ds2020.controllers;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import ro.tuc.ds2020.dtos.MeasurementDTO;
import ro.tuc.ds2020.publisher.RabbitMQJsonProducer;

import java.util.HashMap;
import java.util.Map;

@RequestMapping("/api/v1")
@RestController
public class MessageJsonController {

    private final RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer rabbitMQJsonProducer) {
        this.jsonProducer = rabbitMQJsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<Map<String, String>> sendJsonMessage(@RequestBody MeasurementDTO measurementDTO) {
        jsonProducer.sendJsonMessage(measurementDTO);

        // Create a JSON response body
        Map<String, String> response = new HashMap<>();
        response.put("message", "Json message sent to RabbitMQ");
        response.put("status", "success");

        return ResponseEntity.ok(response);
    }
}

MeasurementDTO.java

package ro.tuc.ds2020.dtos;

import com.fasterxml.jackson.annotation.JsonProperty;

public class MeasurementDTO {

    @JsonProperty("sensorId")
    private String sensorId;

    @JsonProperty("value")
    private double value;

    @JsonProperty("unit")
    private String unit;

    @JsonProperty("timestamp")
    private String timestamp;

    // Getters and Setters
    public String getSensorId() {
        return sensorId;
    }

    public void setSensorId(String sensorId) {
        this.sensorId = sensorId;
    }

    public double getValue() {
        return value;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public String getUnit() {
        return unit;
    }

    public void setUnit(String unit) {
        this.unit = unit;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "MeasurementDTO{" +
                "sensorId='" + sensorId + '\'' +
                ", value=" + value +
                ", unit='" + unit + '\'' +
                ", timestamp='" + timestamp + '\'' +
                '}';
    }
}

RabbitMQJsonProducer.java

package ro.tuc.ds2020.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import ro.tuc.ds2020.dtos.MeasurementDTO;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.queue.exchange}")
    private String exchange;

    @Value("${rabbitmq.queue.routing_key_json}")
    private String routingKeyJson;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(MeasurementDTO measurementDTO) {
        LOGGER.info(String.format("Json message sent -> %s", measurementDTO.toString()));
        rabbitTemplate.convertAndSend(exchange, routingKeyJson, measurementDTO);
    }
}

Ds2020Application.java

package ro.tuc.ds2020;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Ds2020Application {
    public static void main(String[] args) {
        SpringApplication.run(Ds2020Application.class, args);
    }
}

application.properties

spring.rabbitmq.host = localhost
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest

rabbitmq.queue.name = queue_1
rabbitmq.queue.json.name = queue_json
rabbitmq.queue.exchange = exchange
rabbitmq.queue.routing_key_one = routing_key_1
rabbitmq.queue.routing_key_json = routing_key_json

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>ro.tuc</groupId>
    <artifactId>ds2020</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Starter AMQP -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Jackson for JSON serialization -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Spring Boot Maven Plugin -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*</include>
                </includes>
            </resource>
        </resources>        
    </build>
</project>

docker-compose.yml

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672" # RabbitMQ messaging port
      - "15672:15672" # RabbitMQ management UI
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

Launching RabbitMQ

docker compose up

enter image description here

Access RabbitMQ UI

username: guest
password: guest
http://localhost:15672/#/

enter image description here

Compile jar

mvn clean install

enter image description here

dir target

enter image description here

launching Java project

java -jar target/ds2020-1.0.0.jar

enter image description here

Call REST API by Postman

POST http://localhost:8080/api/v1/publish

Input Body

{
    "sensorId": "12345",
    "value": 67.5,
    "unit": "Celsius",
    "timestamp": "2024-11-16T18:30:00Z"
}

enter image description here

Java Side enter image description here

Consumer will display in Spring Log

2024-11-16 19:11:43.964  INFO 22464 --- [ntContainer#0-1] r.t.d.consumer.RabbitMQJsonConsumer      : Received JSON message here ->
{
  "sensorId" : "12345",
  "value" : 67.5,
  "unit" : "Celsius",
  "timestamp" : "2024-11-16T18:30:00Z"
}

You can see the Spike in Rabbit UI enter image description here

If you want to see the queue message by RabbitMQ UI

you need to comment out RabbitMQJsonConsumer.java

From

@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

To

//@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})

Then build jar and run it again

enter image description here

  • The @RabbitListener annotation makes the consumer automatically consume messages from the queue as soon as they arrive.
  • When the consumer processes a message, it removes it from the queue, leaving the queue empty.
  • By commenting out @RabbitListener, the consumer is disabled, and messages remain in the queue for inspection.
  • This behavior ensures that messages are not lost but immediately processed unless explicitly paused.
  • For debugging, disabling the consumer allows you to verify message flow and queue content in RabbitMQ.

Good Luck!

Upvotes: 0

Related Questions