Reputation: 684
I want to use hazelcast or coherence EntryProcessor to process some logic in parallel execution on different nodes where the keys are stored in a cache. I see i can use something like sendToEachKey(EntryProcessor process).
My problem comes when i need to send also with the logic a piece of data to process too that belongs to another system and i receive it(in a http request for example).
Sure i can do something like sendToEachKey(EntryProcessor(data) process). But if the data it's diferent to each key and i want to send to a specific key only his data to process, how can i do that? Why i want to do that is because the data is too big and I have network overload.
Sure that if I open a thread pool to send each data to each key is possible but it is inefficient because of the huge requests.
Thanks!
Upvotes: 1
Views: 308
Reputation: 3150
In Hazelcast you would be doing executeOnKeys(keys, new EntryProcessor(data))
and this is too much as the data is too big.
Why not
executeOnKey(key1, new EntryProcessor(data1));
executeOnKey(key2, new EntryProcessor(data2));
executeOnKey(key3, new EntryProcessor(data3));
to send the data subset each key needs ?
Upvotes: 0
Reputation: 6104
For Hazelcast you could retrieve all values and send each key it's own EntryProcessor
, however this will create a lot of overhead.
The other option would be to use a combination of EntryProcessor
and our distributed ExecutorService
.
You send a Runnable
to an ExecutorService. Inside the Runnable
you retrieve the local keyset, retrieve all the external values (all that already local to the node) and than you emit one EntryProcessor
per local key. Since you're already local to the node, there's no more traffic flying around (apart from the backups, obviously :)). That said you might want to implement a specific EntryProcessor
that only transmits the changed value but not the full processor itself (to save even more traffic).
Upvotes: 1
Reputation: 487
In Coherence you could use PartitionedService
to find association of cache keys to cluster members. Then you could invoke entry processor with data for each member, using PartitionedFilter
to make sure that data is sent only to that member. Something like this:
// keys in this map are also keys in cache
void processData(Map<String, Data> externalData) {
PartitionedService partitionedService = (PartitionedService) cache.getCacheService();
Map<Member, Map<String, Data>> dataForMembers = splitDataByMembers(partitionedService, externalData);
for (Entry<Member, Map<String, Data>> dataForMember : dataForMembers.entrySet()) {
Member member = dataForMember.getKey();
Map<String, Data> data = dataForMember.getValue();
PartitionSet partitions = partitionedService.getOwnedPartitions(member);
PartitionedFilter filter = new PartitionedFilter<>(Filters.always(), partitions);
EntryProcessor processor = new MyEntryProcessor(data);
cache.async().invokeAll(filter, processor);
}
}
Map<Member, Map<String, Data>> splitDataByMembers(
PartitionedService partitionedService,
Map<String, Data> externalData) {
Map<Member, Map<String, Data>> dataForMembers = new HashMap<>();
for (Object member : partitionedService.getInfo().getServiceMembers()) {
dataForMembers.put((Member) member, new HashMap<>());
}
for (Entry<String, Data> dataForKey : externalData.entrySet()) {
Member member = partitionedService.getKeyOwner(dataForKey.getKey());
dataForMembers.get(member).put(dataForKey.getKey(), dataForKey.getValue());
}
return dataForMembers;
}
This way there will be only one entry processor invocation for each member in cluster and each member will get only data it is interested in.
I used String
as a cache key and arbitrary Data
type for data associated with this key, but you could of course use any other types (and you don't have to model external data as a map at all).
Upvotes: 1