Reputation: 1163
I have one instance of an ActiveMQ server running with several data sources pushing data into two queues + one DLQ if message consumption fails. I use Apache Camel to consume and process messages from these queues and want to write it to an InfluxDB.
However, up to now I failed to get the stack running such that Apache Camel consumes all queues in parallel. I always ran into errors of that kind:
ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port
How can I consume from multiple queues by one Apache Camel instance?
I tried two approaches:
Currently, my code looks like this:
Camel Config
@Configuration
public class CamelConfig {
@Bean
public ShutdownStrategy shutdownStrategy() {
MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
int timeout = 1200;
MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
Thread.currentThread().getStackTrace()[0].getMethodName());
strategy.setTimeout(timeout); // TODO make it configurable
return strategy;
}
}
ActiveMQ client config
@Configuration
public class ActiveMqClientConfig {
@Bean
public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://servername:61616");
connectionFactory.setUserName(username);
connectionFactory.setPassword(passwd);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID("Influx Message Queue");
connectionFactory.setConnectResponseTimeout(300);
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
Thread.currentThread().getStackTrace()[0].getMethodName());
return connectionFactory;
}
}
Influx Config
@Configuration
public class InfluxDBClientConfig {
@Bean
public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
return () -> {
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
Builder builder = new OkHttpClient.Builder() //
.readTimeout(1200, TimeUnit.SECONDS) //
.writeTimeout(1200, TimeUnit.SECONDS) //
.connectTimeout(1200, TimeUnit.SECONDS) //
.retryOnConnectionFailure(true);
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
Thread.currentThread().getStackTrace()[0].getMethodName());
return builder;
};
}
}
Component with multiple routes:
@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
@Autowired
private FrameworkConfig frameworkConfig;
@Override
public void configure() throws Exception {
String consumerQueueq = "activemq:queue:queue1?" //
+ "brokerURL=tcp://ip:port";
String consumerActiveMqDLQ = "activemq:queue:ActiveMQ.DLQ?" //
+ "brokerURL=tcp://ip:port";
String consumerQueue2 = "activemq:queue:queue2?" //
+ "brokerURL=tcp://ip:port";
String emitterInfluxDB = "influxdb://influxDb?databaseName=databaseName"
+ "&batch=true" //
+ "&retentionPolicy=retentionPolicy"
String emitterStreamOut = "stream:out";
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerCryringInbound) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerActiveMqDLQ) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from olog_inbound to olog
//************************************************************************
from(consumerOlog) //
.process(messagePayload -> {
System.out.println(messagePayload.getIn());
}) //
.to(emitterStreamOut);
}
}
Upvotes: 1
Views: 3228
Reputation: 1163
In contrast to what Camel Multiple Consumers Implementation Issue suggests (multiple from().to() routes in one @Component) I managed to make this work by splitting up the routes into individual components each, each having an individual clientId. Furthermore I replaced the static client-ID in the ActiveMQ-configuration by a UUID. Code:
ActiveMQ-Config:
@Configuration
public class ActiveMqClientConfig {
@Bean
public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://servername:port");
connectionFactory.setUserName(username);
connectionFactory.setPassword(password);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID(UUID.randomUUID().toString());
connectionFactory.setConnectResponseTimeout(300);
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
Thread.currentThread().getStackTrace()[0].getMethodName());
return connectionFactory;
}
}
Route-Components:
@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
@Autowired
private FrameworkConfig frameworkConfig;
@Override
public void configure() throws Exception {
String consumerCryringInbound = "activemq:queue:queue1?"
+ "brokerURL=tcp://activemq-server-ip:port"
+ "clientId=clientid1";
String emitterInfluxDB = "influxdb://influxDb?databaseName=influx_db_name"
+ "&batch=true"
+ "&retentionPolicy=retentionPolicy";
String emitterStreamOut = "stream:out";
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerCryringInbound)
.process(processor code)
.to(emitterInfluxDB)
.onException(Exception.class)
.useOriginalMessage()
.handled(true)
.log("error")
.to(emitterStreamOut);
}
}
... similar for other routes, each with individual clientId.
Upvotes: 0
Reputation:
Only one client can use a ClientID
. They must be unique, and not something you probably want to set manually. Another option might be to set the ClientIDPrefix
in order to get a better way to identify which app is consuming.
Upvotes: 1