Hendy Irawan
Hendy Irawan

Reputation: 21424

How perform GridCache.forEach() concurrently and partitioned on all nodes?

With code:

try (Grid grid = GridGain.start(AnswerYagoFactTests.class.getResource("yago.gridgain.xml"))) {
    GridCache<Integer, YagoRule> cache = grid.cache("yagoRules");
    grid.compute().broadcast(new Runnable() {
        @Override
        public void run() {
            try {
                log.info("Cache formerly has size={} offheap={} overflow={}",
                        cache.size(), cache.offHeapEntriesCount(), cache.overflowSize());
            } catch (GridException e) {
                log.error("Cannot get overflow size", e);
            }
        }
    }).get();

    log.info("1 is {}", cache.get(1));
    grid.compute().apply(new GridClosure<String, String>() {
        @Override
        public String apply(String e) {
            log.info("Found {} YAGO rules", cache.size());
            cache.forEach(new GridInClosure<GridCacheEntry<Integer,YagoRule>>() {
                @Override
                public void apply(GridCacheEntry<Integer, YagoRule> e) {
                    log.info("Processing rule #{} {}", e.getKey(), e.getValue());
                }
            });
            return null;
        }
    }, msg).get();
}

In 3 node configuration. GridGain selects one node (seem to be random), then processes each "Processing rule" in that node only.

What I'd like to do is make the forEach parallel, so ideally for 3 nodes and 30 entries, each node should process 10 entries. The cache is partitioned so each node has its own entries.

Upvotes: 0

Views: 73

Answers (1)

Dmitriy
Dmitriy

Reputation: 2292

In your code, instead of grid.compute().apply(...) try using grid.compute().broadcast(...) and the closure will be broadcast to all nodes in the grid.

Also if you need to iterate only through primary set (excluding backups), you can do it as following:

grid.compute().broadcast(new GridClosure<String, Integer>() {
    @Override public String apply(String e) {
        for (GridCacheEntry<Integer, YagoRule> e : cache.primaryEntrySet()
            log.info("Processing rule #{} {}", e.getKey(), e.getValue());
    }
}, msg).get();

Upvotes: 1

Related Questions