Reputation: 1417
I am using Spring Boot 1.3.0.RELEASE with spring-boot-starter-amqp v 1.3.0. The Rabbit broker is version RabbitMQ 3.5.6, Erlang 18.1.
I have written a small AMQP message listener application that just accepts messages and writes them to a database. Its based heavily on the Messaging with Rabbit MQ Spring Guide.
My changes are converting the TopicExchange to a FanoutExchange and adding a call to setConcurrentConsumers(). And I added JDBC, and I also removed the code that sends a message.
It works well except for one issue; each time I start it, it always misses the first message. The sending app is not restarted, and very reliably, when I cause the sending app to send a message, this app misses the first one I send. After that it seems to get them all.
Code is pasted below, thanks for your help.
@SpringBootApplication
public class Application implements CommandLineRunner
{
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
ConfigurationService cs;
@Bean
Queue queue()
{
return new Queue(cs.getRabbitQueue(), false, false, true);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange)
{
return BindingBuilder.bind(queue).to(exchange);
}
// Added by JWA
@Bean
public ConnectionFactory connectionFactory()
{
CachingConnectionFactory cf = new CachingConnectionFactory(cs.getRabbitHost());
cf.setUsername(cs.getRabbitUserName());
cf.setPassword(cs.getRabbitPassword());
cf.setVirtualHost(cs.getRabbitVirtualHost());
return cf;
}
@Bean
public FanoutExchange fanout()
{
return new FanoutExchange("logs", false, false);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(cs.getRabbitQueue());
container.setConcurrentConsumers(cs.getRabbitNumListeners());
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Receiver receiver()
{
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver)
{
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException
{
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception
{
// Nothing to do here
}
}
public class Receiver
{
@Autowired
JdbcTemplate jdbcTemplate;
private final static String sql = "INSERT INTO msc_reporting_log (eventtime, rectype, userid, prospect_key, userip, userhostname, phase, decision, reason, loghost, sourcehost) values (?, ?, ?, HEXTORAW(?), ?, ?, ?, ?, ?, ?, ?)";
private String hn;
public void receiveMessage(String message)
{
System.out.println("Received: " + message);
LogMessage lm = extractJson(message);
logWithLogger(lm);
logToDatabase(lm);
}
private void logWithLogger(LogMessage lm)
{
String msg = "MESSAGE_RECEIVED," + lm;
Logger.getGlobal().info(msg);
}
private void logToDatabase(LogMessage m)
{
jdbcTemplate.update(sql, m.getEventTime(), m.getType(), m.getUserId(), m.getUserProspectKey(), m.getUserIP(), m.getUserHostName(), m.getPhase(), m.getDecision(), m.getReason(), hn, m.getSourceHostName());
}
private LogMessage extractJson(String m)
{
try
{
ObjectMapper mapper = new ObjectMapper();
LogMessage lm = mapper.readValue(m, LogMessage.class);
return lm;
}
catch(JsonMappingException jme)
{
Logger.getGlobal().log(Level.SEVERE, "Error mapping JSON", jme);
jme.printStackTrace();
}
catch(JsonParseException jpe)
{
Logger.getGlobal().log(Level.SEVERE, "Error parsing JSON", jpe);
jpe.printStackTrace();
}
catch(IOException ioe)
{
Logger.getGlobal().log(Level.SEVERE, "IO Error while extracting JSON", ioe);
ioe.printStackTrace();
}
return null;
}
}
UPDATE
As far as the timing of this error, the publisher program is running before I start this listener. I start the listener. I then cause the publisher to send a message, usually by making an incorrect login. The publisher produces the message, and the receiver app seems to not notice at all. I do it again, and the second message is then received.
I have modified the receiver application to use an anonymous queue instead, because i want to run multiple instances of this logger application for redundancy. The issue still happens. Here is the new receiver code, the publisher code and the receiver pom:
Receiver POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.xxxxx.ua</groupId>
<artifactId>DecisionsLogger</artifactId>
<version>0.1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.0.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc7</artifactId>
<version>12.1.0.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.7</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release</url>
</pluginRepository>
</pluginRepositories>
</project>
New app, has the same problem as the one above:
@SpringBootApplication
public class Application implements CommandLineRunner
{
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
ConfigurationService cs;
public String getLocalHostname()
{
return cs.getLocalHostName();
}
@Bean
Queue queue()
{
//return new Queue(cs.getRabbitQueue(), false, false, true);
return new AnonymousQueue();
}
@Bean
Binding binding(FanoutExchange exchange)
{
return BindingBuilder.bind(queue()).to(exchange);
}
@Bean
public ConnectionFactory connectionFactory()
{
CachingConnectionFactory cf = new CachingConnectionFactory(cs.getRabbitHost());
cf.setUsername(cs.getRabbitUserName());
cf.setPassword(cs.getRabbitPassword());
cf.setVirtualHost(cs.getRabbitVirtualHost());
return cf;
}
@Bean
public FanoutExchange fanout()
{
return new FanoutExchange(cs.getRabbitExchange(), false, false);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory)
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(cs.getRabbitMinListeners());
factory.setMaxConcurrentConsumers(cs.getRabbitMaxListeners());
return factory;
}
@Bean
Receiver receiver()
{
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver)
{
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException
{
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception
{
while(true) {}
// Nothing to do here
}
}
Publisher code:
@Autowired
private RabbitTemplate rabbitTemplate;
Elsewhere in the same class as the @Autowired above:
private void send(Message m)
{
if(!isActive)
return;
if(rabbitTemplate == null)
{
DecLogger.DEC.fine(() -> "Unable to send Rabbit Message - rabbitTemplate is null");
return;
}
if(configSvc.getRabbitQueue() == null)
{
DecLogger.DEC.fine(() -> "Unable to send Rabbit Message - queueName is null");
return;
}
ObjectMapper mapper = new ObjectMapper();
String time = ZonedDateTime.now().toString();
m.setEventTime(time);
try
{
String tmpStr = mapper.writeValueAsString(m);
rabbitTemplate.convertAndSend(configSvc.getRabbitExchange(), configSvc.getRabbitQueue(), tmpStr);
DecLogger.DEC.finest(() -> "Sent Rabbit Message: " + tmpStr);
}
catch(Exception e)
{
DecLogger.DEC.fine(() -> "Failed sending Rabbit Message");
DecLogger.DEC.fine(() -> "Exception: " + e);
}
}
The Message class used above is not from Spring Framework (I should rename it):
abstract class Message
{
@JsonProperty
private String eventTime;
@JsonProperty
private String type;
@JsonProperty
protected String sourceHostName;
public Message(String type, String sourceHostName)
{
setType(type);
setSourceHostName(sourceHostName);
}
public void setEventTime(String time)
{
this.eventTime = time;
}
private void setType(String type)
{
this.type = type;
}
private void setSourceHostName(String sourceHostName)
{
this.sourceHostName = sourceHostName;
}
}
ISSUE RESOLVED
The problem turned out to be on the producer side. This line:
rabbitTemplate.convertAndSend(configSvc.getRabbitExchange(), configSvc.getRabbitQueue(), tmpStr);
was replaced with this line:
rabbitTemplate.convertAndSend(configSvc.getRabbitExchange(), "", tmpStr);
The only change being the second parameter. Apparently providing the routing key in the second parameter of convertAndSend() caused the issue.
Upvotes: 2
Views: 1403
Reputation: 174554
What you are seeing makes no sense to me; my first step would be to turn on DEBUG logging and watch the message flow - so you can see if the message arrives at the container at all. If not, you need to look elsewhere.
Another thing to look for is a message sitting in unacked state in the management console. That would imply there's another consumer someplace; you can see the consumers on the management console.
Aside: Why are you mixing boot jar versions? I am not suggesting that's the issue, but I'd use the 1.2.7 starter with boot 1.2.7 or switch everything to 1.3.0.
I see the problem:
return new Queue(cs.getRabbitQueue(), false, false, true);
This creates an auto-delete
queue.
So when your app is not running, the queue disappears and publishing to the fanout
does nothing.
The RabbitAdmin
provided by Boot will re-create the queue when the app starts.
Try:
return new Queue(cs.getRabbitQueue());
tben the queue will remain after the app stops.
Upvotes: 0