hakson
hakson

Reputation: 146

JSR-352 batch job delays between chunks: how to reduce commit time without bulk delete?

I am running a JSR-352 batch job (concretely, Apache JBatch as implementation) in a JEE environment (WebSphere with EclipseLink as the JPA provider).

The batch processes database records in chunks and deletes entities based on their IDs with some condition. The problem is that after a chunk of entities is deleted, the next chunk is read only after a delay of several minutes, with the delay increasing proportionally to the chunk size, so e.g.:

My job configuration:

The job works but slows down significantly as chunk size increases. My setup includes:

I tried also bulk deletes like DELETE FROM Entity WHERE id IN (...) but this not an option for me because they do not cascade deletions to related entities. The entity I want to delete is very complex and in the logs I can see that deleting this entity result in deleting rows from about 50 tables (whether this is a good design is another question)

Using em.flush() in my writer after processing each chunk leads to transaction timeout, so therefore I don't use flush.

What I have also observed is that em.clear() after each chunk without flush solves the stuck problem, but the changes are not committed to the database. So just doing clear is also not correct. EntityManager might be holding too many managed entities, or database locks/constraints could be causing the slowdown but I've tried a lot, but I don't know how to solve this problem.

Here is the code for my Reader:

import java.io.Serializable;

import javax.batch.api.chunk.AbstractItemReader;
import javax.batch.runtime.context.StepContext;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import org.apache.batchee.cdi.scope.StepScoped;

@StepScoped
@Named
public class MyReader implements AbstractItemReader {

    private int chunkSize = 50;
    
    @PersistenceContext
    private EntityManager em;
    
    private List<Long> currentIdsInChunk;
    private int currentBatchPosition = 0;
    
    ...constructor

    ...reading the chunksize from batch properties

    @Override
    public Long readItem() throws Exception {
    
        if (currentIdsInChunk == null || currentBatchPosition >= currentIdsInChunk.size()) {
            LOGGER.info("Loading Ids for the next chunk...");
             currentIdsInChunk = em.createNativeQuery("SELECT e.id FROM Entity e WHERE e.condition = :condition")
                              .setParameter("condition", "SOME_CONDITION")
                              .setMaxResults(chunkSize)
                              .getResultList();
            if (currentIdsInChunk.isEmpty()) {
                return null;
            }
            currentBatchPosition = 0;
        }
        return currentIdsInChunk.get(currentBatchPosition++);
    }


    @Override
    public Serializable checkpointInfo() {
        return currentBatchPosition;
    }
}

My writer look like this:

import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import org.apache.batchee.extras.typed.NoStateTypedItemWriter;

@Named
@Dependent
public class MyDeleteWriter implements NoStateTypedItemWriter<Long> {

    @PersistenceContext
    private EntityManager em;
    
    ... constructor

    @Override
    public void writeItems(List<Long> ids) throws Exception {
        int deletedCount = 0;
        for (Long id : ids) {
            Entity entity = em.find(Entity.class, id);
            if (entityToDelete != null) {
                if (!em.contains(entityToDelete)) {
                    entityToDelete = em.merge(entityToDelete);
                }
                em.remove(entityToDelete);
                deletedCount++;
                
            } else {
                // log entity does not exist
            }
        }
        
    }
}

Upvotes: 1

Views: 37

Answers (0)

Related Questions