Shashank
Shashank

Reputation: 782

How to use Aggregation Query with MongoItemReader in spring batch

Some how the requirement changed and I have to use aggregation query insted of basic query in setQuery(). Is this even possible? Please suggest how can i do that? My Aggregation query is ready but not sure how can I use that in spring batch

public ItemReader<ProfileCollection> searchMongoItemReader() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        MongoItemReader<MyCollection> mongoItemReader = new MongoItemReader<>();
        mongoItemReader.setTemplate(myMongoTemplate);
        mongoItemReader.setCollection(myMongoCollection);

        mongoItemReader.setQuery(" Some Simple Query - Basic");

        mongoItemReader.setTargetType(MyCollection.class);
        Map<String, Sort.Direction> sort = new HashMap<>();
        sort.put("field4", Sort.Direction.ASC);
        mongoItemReader.setSort(sort);
        return mongoItemReader;

    }

Upvotes: 7

Views: 5854

Answers (2)

Fabio Almeida
Fabio Almeida

Reputation: 146

To be able to use aggregation in a job, making use of all features that spring batch has, you have to create a custom ItemReader. Extending AbstractPaginatedDataItemReader we can use all the elements from pageable operations.

Here's a sample of that custom class:

public class CustomAggreagationPaginatedItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)");
    private MongoOperations template;
    private Class<? extends T> type;
    private Sort sort;
    private String collection;

    public CustomAggreagationPaginatedItemReader() {
        super();
        setName(ClassUtils.getShortName(CustomAggreagationPaginatedItemReader.class));
    }

    public void setTemplate(MongoOperations template) {
        this.template = template;
    }

    public void setTargetType(Class<? extends T> type) {
        this.type = type;
    }

    public void setSort(Map<String, Sort.Direction> sorts) {
        this.sort = convertToSort(sorts);
    }

    public void setCollection(String collection) {
        this.collection = collection;
    }

    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {
        Pageable pageRequest = new PageRequest(page, pageSize, sort);
        
        BasicDBObject cursor = new BasicDBObject();
        cursor.append("batchSize", 100);
        
        SkipOperation skipOperation = skip(Long.valueOf(pageRequest.getPageNumber()) * Long.valueOf(pageRequest.getPageSize()));

        Aggregation aggregation = newAggregation(
                //Include here all your aggreationOperations,
                skipOperation,
                limit(pageRequest.getPageSize())
            ).withOptions(newAggregationOptions().cursor(cursor).build());

        return (Iterator<T>) template.aggregate(aggregation, collection, type).iterator();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.state(template != null, "An implementation of MongoOperations is required.");
        Assert.state(type != null, "A type to convert the input into is required.");
        Assert.state(collection != null, "A collection is required.");
    }

    private String replacePlaceholders(String input, List<Object> values) {
        Matcher matcher = PLACEHOLDER.matcher(input);
        String result = input;

        while (matcher.find()) {
            String group = matcher.group();
            int index = Integer.parseInt(matcher.group(1));
            result = result.replace(group, getParameterWithIndex(values, index));
        }

        return result;
    }

    private String getParameterWithIndex(List<Object> values, int index) {
        return JSON.serialize(values.get(index));
    }

    private Sort convertToSort(Map<String, Sort.Direction> sorts) {
        List<Sort.Order> sortValues = new ArrayList<Sort.Order>();

        for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
            sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
        }

        return new Sort(sortValues);
    }
}

If you look with attention you can see it was created using MongoItemReader from Spring framework, you can see that class at org.springframework.batch.item.data.MongoItemReader, there's the way you have to create a whole new class extending AbstractPaginatedDataItemReader, if you have a look at "doPageRead" method you should be able to see that it only uses the find operation of MongoTemplate, making it impossible to use Aggregate operations in it.

Here is how you can define the aggregation on overriding the main method in our CustomReader:

@Bean
public ItemReader<YourDataClass> reader(MongoTemplate mongoTemplate) {
    CustomAggreagationPaginatedItemReader<YourDataClass> customAggreagationPaginatedItemReader = new CustomAggreagationPaginatedItemReader<>();

    Map<String, Direction> sort = new HashMap<String, Direction>();
    sort.put("id", Direction.ASC);
        
    customAggreagationPaginatedItemReader.setTemplate(mongoTemplate);
    customAggreagationPaginatedItemReader.setCollection("collectionName");
    customAggreagationPaginatedItemReader.setTargetType(YourDataClass.class);
    customAggreagationPaginatedItemReader.setSort(sort);
        
    return customAggreagationPaginatedItemReader;
}

As you may notice, you also need an instance of MongoTemplate, here's how it should look like too:

@Bean
public MongoTemplate mongoTemplate(MongoDbFactory mongoDbFactory) {
    return new MongoTemplate(mongoDbFactory);
}

Where MongoDbFactory is an autowired object by spring framework.

Hope that's enough to help you.

Upvotes: 4

Faraz
Faraz

Reputation: 6275

extend MongoItemReader and provide your own implementation for method doPageRead(). This way you will have full pagination support and this reading of documents will be part of a step.

public class CustomMongoItemReader<T, O> extends MongoItemReader<T> {
private MongoTemplate template;
private Class<? extends T> inputType;
private Class<O> outputType
private MatchOperation match;
private ProjectionOperation projection;
private String collection;

@Override
protected Iterator<T> doPageRead() {
    Pageable page = PageRequest.of(page, pageSize) //page and page size are coming from the class that MongoItemReader extends
    Aggregation agg = newAggregation(match, projection, skip(page.getPageNumber() * page.getPageSize()), limit(page.getPageSize()));
    return (Iterator<T>) template.aggregate(agg, collection, outputType).iterator();

}
}

And other getter and setters and other methods. Just have a look at sourcecode for MongoItemReader here. I also removed Query support from it. You can have that also in the same method just copy paste it from MongoItemReader. Same with Sort.

And in the class where you have a reader, you would do something like:

public MongoItemReader<T> reader() {
    CustomMongoItemReader reader = new CustomMongoItemReader();
    reader.setTemplate(mongoTemplate);
    reader.setName("abc");
    reader.setTargetType(input.class);
    reader.setOutputType(output.class);
    reader.setCollection(myMongoCollection);
    reader.setMatch(Aggregation.match(new Criteria()....)));
    reader.setProjection(Aggregation.project("..","..");
    return reader;
}

Upvotes: 6

Related Questions