Reputation: 487
I have a JMS queue which gets filled with some time series data. In order to prevent thousands of single transaction SQL inserts I want to processes them in a bulky way rather than the MessageListener onMessage "per Message" way.
The only solution I thought of is a schedule fetching a bulk of messages from the queue and save them periodically.
@Stateless
public class SensorDataReceiver {
private static final int THRESHOLD_IN_SECONDS = 10;
private static final int QUEUE_TIMEOUT_IN_MILLIS = 1000;
@Resource(mappedName = "java:jboss/jms/queue/sensorData")
private Queue queue;
@Inject
private JMSContext context;
@Inject
private SensorDataDAO sensorDataDAO;
@SneakyThrows
@Schedule(hour = "*", minute = "*", second = "*/15", persistent = false)
public void scheduled() {
LocalDateTime statUpPlusThreshold = now().plusSeconds(THRESHOLD_IN_SECONDS);
JMSConsumer consumer = context.createConsumer(queue);
List<SensorData> sensorDataToInsert = new ArrayList<>();
do {
ObjectMessage message = (ObjectMessage) consumer.receive(QUEUE_TIMEOUT_IN_MILLIS);
if (message == null) {
break;
}
sensorDataToInsert.add((sensorData) message.getObject());
} while (now().isBefore(statUpPlusThreshold) && sensorDataToInsert.size() < 10_000);
logger.info(format("Got \"%d\" SensorData to persist.", sensorDataToInsert.size()));
sensorDataDAO.batchSaveOrUpdate(sensorDataToInsert);
logger.info(format("Persisted \"%d\" SensorData.", sensorDataToInsert.size()));
}
}
But I do not believe this is the smartest way doing this, hence I waste time to process more messages per minute when the schedule executes faster than the configured interval (I can insert 10k rows in about 2-3 sec. on my test system) and on the other hand this code is prone to produce "overlapping scheduled execution".
Upvotes: 3
Views: 639
Reputation: 34988
I would recommend having a pool of stateless beans that are active all the time (i.e. they are not scheduled) which consume a set number of messages (i.e. not until the queue is empty which will be an arbitrary number of messages) and then which insert the data from those messages in a single database operation.
All the beans in the pool can be active concurrently and can consume & insert their batches as quickly as possible. This will ensure the messages are consumed in a timely fashion which should hopefully avoid any build-up of messages in the queue.
You can have a timeout on receive
so that if you do reach the end of the queue before the batch size has been hit the data will still be inserted in a timely fashion.
In order to kick this off when the application server starts you can annotate a bean with @Startup
and @Singleton
and then annotate a method with @PostConstruct
which loops enough times to fill your "pool" and calls the method on your @Stateless
bean which will receive the batches of messages and process them.
Upvotes: 2