Guido Krömer
Guido Krömer

Reputation: 487

Batch / Bulk Message (JMS) Processing with Wildfly

I have a JMS queue which gets filled with some time series data. In order to prevent thousands of single transaction SQL inserts I want to processes them in a bulky way rather than the MessageListener onMessage "per Message" way.

The only solution I thought of is a schedule fetching a bulk of messages from the queue and save them periodically.

@Stateless
public class SensorDataReceiver {

    private static final int THRESHOLD_IN_SECONDS = 10;

    private static final int QUEUE_TIMEOUT_IN_MILLIS = 1000;

    @Resource(mappedName = "java:jboss/jms/queue/sensorData")
    private Queue queue;

    @Inject
    private JMSContext context;

    @Inject
    private SensorDataDAO sensorDataDAO;

    @SneakyThrows
    @Schedule(hour = "*", minute = "*", second = "*/15", persistent = false)
    public void scheduled() {
        LocalDateTime statUpPlusThreshold = now().plusSeconds(THRESHOLD_IN_SECONDS);
        JMSConsumer consumer = context.createConsumer(queue);

        List<SensorData> sensorDataToInsert = new ArrayList<>();
        do {
            ObjectMessage message = (ObjectMessage) consumer.receive(QUEUE_TIMEOUT_IN_MILLIS);

            if (message == null) {
                break;
            }

            sensorDataToInsert.add((sensorData) message.getObject());
        } while (now().isBefore(statUpPlusThreshold) && sensorDataToInsert.size() < 10_000);

        logger.info(format("Got \"%d\" SensorData to persist.", sensorDataToInsert.size()));
        sensorDataDAO.batchSaveOrUpdate(sensorDataToInsert);
        logger.info(format("Persisted \"%d\" SensorData.", sensorDataToInsert.size()));
    }
}

But I do not believe this is the smartest way doing this, hence I waste time to process more messages per minute when the schedule executes faster than the configured interval (I can insert 10k rows in about 2-3 sec. on my test system) and on the other hand this code is prone to produce "overlapping scheduled execution".

Upvotes: 3

Views: 639

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 34988

I would recommend having a pool of stateless beans that are active all the time (i.e. they are not scheduled) which consume a set number of messages (i.e. not until the queue is empty which will be an arbitrary number of messages) and then which insert the data from those messages in a single database operation.

All the beans in the pool can be active concurrently and can consume & insert their batches as quickly as possible. This will ensure the messages are consumed in a timely fashion which should hopefully avoid any build-up of messages in the queue.

You can have a timeout on receive so that if you do reach the end of the queue before the batch size has been hit the data will still be inserted in a timely fashion.

In order to kick this off when the application server starts you can annotate a bean with @Startup and @Singleton and then annotate a method with @PostConstruct which loops enough times to fill your "pool" and calls the method on your @Stateless bean which will receive the batches of messages and process them.

Upvotes: 2

Related Questions