Reputation: 21424
try (Grid grid = GridGain.start(AnswerYagoFactTests.class.getResource("yago.gridgain.xml"))) {
GridCache<Integer, YagoRule> cache = grid.cache("yagoRules");
grid.compute().broadcast(new Runnable() {
@Override
public void run() {
try {
log.info("Cache formerly has size={} offheap={} overflow={}",
cache.size(), cache.offHeapEntriesCount(), cache.overflowSize());
} catch (GridException e) {
log.error("Cannot get overflow size", e);
}
}
}).get();
log.info("1 is {}", cache.get(1));
grid.compute().apply(new GridClosure<String, String>() {
@Override
public String apply(String e) {
log.info("Found {} YAGO rules", cache.size());
cache.forEach(new GridInClosure<GridCacheEntry<Integer,YagoRule>>() {
@Override
public void apply(GridCacheEntry<Integer, YagoRule> e) {
log.info("Processing rule #{} {}", e.getKey(), e.getValue());
}
});
return null;
}
}, msg).get();
}
In 3 node configuration. GridGain selects one node (seem to be random), then processes each "Processing rule" in that node only.
What I'd like to do is make the forEach
parallel, so ideally for 3 nodes and 30 entries, each node should process 10 entries. The cache is partitioned
so each node has its own entries.
Upvotes: 0
Views: 73
Reputation: 2292
In your code, instead of grid.compute().apply(...)
try using grid.compute().broadcast(...)
and the closure will be broadcast to all nodes in the grid.
Also if you need to iterate only through primary set (excluding backups), you can do it as following:
grid.compute().broadcast(new GridClosure<String, Integer>() {
@Override public String apply(String e) {
for (GridCacheEntry<Integer, YagoRule> e : cache.primaryEntrySet()
log.info("Processing rule #{} {}", e.getKey(), e.getValue());
}
}, msg).get();
Upvotes: 1