Reputation: 8249
Being new to Hazelcast Jet, I was trying to build a setup where single item from an infinite source (i.e. a Map Journal of user requests) is MapReduced against a (possibly changing and) huge Map of reference items.
Specifically, for this example I want to determine the IDs of the vectors (read: float[]
) of the smallest Euclidean distance in a map of vectors (the references), given a used-defined input vector (the query).
If implemented naively on a single machine, this would be going through the Map items of the references and determining the euclidean distance to the query for each of them, while keeping the k-smallest matches, where the input is taken from a user request (HTTP POST
, button click, etc.) and the result set is available immediately after the computation finishes.
My recent approach was to:
.distributed().broadcast()
the request to the mapping job .localKeySet()
of the reference vectors.partitioned(item -> item.requestId)
partitioningConceptually here every query is a batch of size 1
and I'm actually processing batches as they come. However, I have massive troubles letting the mappers and reducers know when a batch is done so that the collectors know when they are done (so that they can emit the final result).
I tried using watermarks both with real and fake timestamps (obtained automatically via an AtomicLong
instance) and emit from the tryProcessWm
functions, however that seems to be a very brittle solution as some of the events are dropped. I also need to make sure no two requests are interleaved (i.e. using partitioning on the request ID), but at the same time have the mapper run on all nodes ...
How would I attack this task?
Edit #1:
Right now, my mapper looks like this:
private static class EuclideanDistanceMapP extends AbstractProcessor {
private IMap<Long, float[]> referenceVectors;
final ScoreComparator comparator = new ScoreComparator();
@Override
protected void init(@Nonnull Context context) throws Exception {
this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
super.init(context);
}
@Override
protected boolean tryProcess0(@Nonnull Object item) {
final Tuple3<Long, Long, float[]> query = (Tuple3<Long, Long, float[]>)item;
final long requestId = query.f0();
final long timestamp = query.f1();
final float[] queryVector = query.f2();
final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
for (Long vectorKey : referenceVectors.localKeySet()) {
float[] referenceVector = referenceVectors.get(vectorKey);
float distance = 0.0f;
for (int i = 0; i < queryVector.length; ++i) {
distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
}
final Tuple2<Long, Float> score = Tuple2.tuple2(vectorKey, (float) Math.sqrt(distance));
if (buffer.size() < MAX_RESULTS) {
buffer.add(score);
continue;
}
// If the value is larger than the largest entry, discard it.
if (comparator.compare(score, buffer.last()) >= 0) {
continue;
}
// Otherwise we remove the largest entry after adding the new one.
buffer.add(score);
buffer.pollLast();
}
return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray()));
}
private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
@Override
public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
return Float.compare(a.f1(), b.f1());
}
}
}
The reducer is essentially replicating that (minus the vector calculation, of course).
Edit #2:
Here's the DAG setup. It currently fails when there are more than a handful of concurrent requests. Most of the items are dropped due to the watermarks.
DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
SourceProcessors.<Long, float[], Tuple2<Long, float[]>>streamMapP(QUERY_VECTOR_MAP_NAME,
e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
e -> Tuple2.tuple2(e.getKey(), e.getNewValue()),true));
// simple map() using an AtomicLong to create the timestamp
Vertex addTimestamps = dag.newVertex("addTimestamps", AddTimestampMapP::new);
// the class shown above.
Vertex map = dag.newVertex("map", EuclideanDistanceMapP::new);
Vertex insertWatermarks = dag.newVertex("insertWatermarks",
insertWatermarksP((Tuple3<Long, Long, float[]> t) -> t.f1(), withFixedLag(0), emitByMinStep(1)));
Vertex combine = dag.newVertex("combine", CombineP::new);
// simple map() that drops the timestamp
Vertex removeTimestamps = dag.newVertex("removeTimestamps", RemoveTimestampMapP::new);
// Using a list here for testing.
Vertex sink = dag.newVertex("sink", SinkProcessors.writeListP(SINK_NAME));
dag.edge(between(sourceStream, addTimestamps))
.edge(between(addTimestamps, map.localParallelism(1))
.broadcast()
.distributed())
.edge(between(map, insertWatermarks).isolated())
.edge(between(insertWatermarks, combine.localParallelism(1))
.distributed()
.partitioned((Tuple2<Long, Tuple2<Long, Float>[]> item) -> item.f0()))
.edge(between(combine, removeTimestamps)
.partitioned((Tuple3<Long, Long, Tuple2<Long, Float>[]> item) -> item.f0()))
.edge(between(removeTimestamps, sink.localParallelism(1)));
Edit #3:
Here's my current combiner implementation. I assume that all items will be ordered according to the watermarks; or in general that only items of the same request will be collected by the same combiner instance. This doesn't seem to be true though ...
private static class CombineP extends AbstractProcessor {
private final ScoreComparator comparator = new ScoreComparator();
private final TreeSet<Tuple2<Long, Float>> buffer = new TreeSet<>(comparator);
private Long requestId;
private Long timestamp = -1L;
@Override
protected boolean tryProcess0(@Nonnull Object item) {
final Tuple3<Long, Long, Tuple2<Long, Float>[]> itemTuple = (Tuple3<Long, Long, Tuple2<Long, Float>[]>)item;
requestId = itemTuple.f0();
final long currentTimestamp = itemTuple.f1();
if (currentTimestamp > timestamp) {
buffer.clear();
}
timestamp = currentTimestamp;
final Object[] scores = itemTuple.f2();
for (Object scoreObj : scores) {
final Tuple2<Long, Float> score = (Tuple2<Long, Float>)scoreObj;
if (buffer.size() < MAX_RESULTS) {
buffer.add(score);
continue;
}
// If the value is larger than the largest entry, discard it.
if (comparator.compare(score, buffer.last()) >= 0) {
continue;
}
// Otherwise we remove the largest entry after adding the new one.
buffer.add(score);
buffer.pollLast();
}
return true;
}
@Override
protected boolean tryProcessWm(int ordinal, @Nonnull Watermark wm) {
// return super.tryProcessWm(ordinal, wm);
return tryEmit(Tuple3.tuple3(requestId, timestamp, buffer.toArray())) && super.tryProcessWm(ordinal, wm);
}
private static class ScoreComparator implements Comparator<Tuple2<Long, Float>> {
@Override
public int compare(Tuple2<Long, Float> a, Tuple2<Long, Float> b) {
return Float.compare(a.f1(), b.f1());
}
}
}
Upvotes: 2
Views: 182
Reputation: 10812
You must always remember that items between two vertices can be reordered. When you have parallel requests, their intermediate results can be interleaved in CombineP
.
In CombineP
, you can rely on the fact that the number of intermediate results is equal to the number of members in the cluster. Calculate the number of participating members in init
from globalParallelism / localParallelism
. When you receive this number of intermediates, you can emit the final result.
Another trick might be to run multiple requests in parallel on each member. You can achieve this by using two edges: 1. broadcast+distributed edge to a parallelism=1 processor 2. unicast edge to a parallelism=N processor
Also note that localKeys
is not suitable for huge maps: the query size is limited.
Here's the code for the above. Code is for Jet 0.5:
The DAG:
DAG dag = new DAG();
Vertex sourceStream = dag.newVertex("source",
streamMapP(QUERY_VECTOR_MAP_NAME,
e -> e.getType() == EntryEventType.ADDED || e.getType() == EntryEventType.UPDATED,
e -> entry(e.getKey(), e.getNewValue()),true));
Vertex identity = dag.newVertex("identity", mapP(identity()))
.localParallelism(1);
Vertex map = dag.newVertex("map", peekOutputP(EuclideanDistanceMapP::new));
Vertex combine = dag.newVertex("combine", peekOutputP(new CombineMetaSupplier()));
Vertex sink = dag.newVertex("sink", writeListP(SINK_NAME));
dag.edge(between(sourceStream, identity)
.broadcast()
.distributed())
.edge(between(identity, map))
.edge(between(map, combine)
.distributed()
.partitioned((Entry item) -> item.getKey()))
.edge(between(combine, sink));
EuclideanDistanceMapP class:
private static class EuclideanDistanceMapP extends AbstractProcessor {
private IMap<Long, float[]> referenceVectors;
final ScoreComparator comparator = new ScoreComparator();
private Object pendingItem;
@Override
protected void init(@Nonnull Context context) throws Exception {
this.referenceVectors = context.jetInstance().getMap(REFERENCE_VECTOR_MAP_NAME);
super.init(context);
}
@Override
protected boolean tryProcess0(@Nonnull Object item) {
if (pendingItem == null) {
final Entry<Long, float[]> query = (Entry<Long, float[]>) item;
final long requestId = query.getKey();
final float[] queryVector = query.getValue();
final PriorityQueue<Entry<Long, Float>> buffer = new PriorityQueue<>(comparator.reversed());
for (Long vectorKey : referenceVectors.localKeySet()) {
float[] referenceVector = referenceVectors.get(vectorKey);
float distance = 0.0f;
for (int i = 0; i < queryVector.length; ++i) {
distance += (queryVector[i] - referenceVector[i]) * (queryVector[i] - referenceVector[i]);
}
final Entry<Long, Float> score = entry(vectorKey, (float) Math.sqrt(distance));
if (buffer.size() < MAX_RESULTS || comparator.compare(score, buffer.peek()) < 0) {
if (buffer.size() == MAX_RESULTS)
buffer.remove();
buffer.add(score);
}
}
pendingItem = entry(requestId, buffer.toArray(new Entry[0]));
}
if (tryEmit(pendingItem)) {
pendingItem = null;
return true;
}
return false;
}
}
CombineP class:
private static class CombineP extends AbstractProcessor {
private final ScoreComparator comparator = new ScoreComparator();
private final Map<Long, PriorityQueue<Entry<Long, Float>>> buffer = new HashMap<>();
private final Map<Long, Integer> accumulatedCount = new HashMap<>();
private final int upstreamMemberCount;
private Entry<Long, Entry<Long, Float>[]> pendingItem;
private CombineP(int upstreamMemberCount) {
this.upstreamMemberCount = upstreamMemberCount;
}
@Override
protected boolean tryProcess0(@Nonnull Object item) {
if (pendingItem == null) {
final Entry<Long, Entry<Long, Float>[]> localValue = (Entry<Long, Entry<Long, Float>[]>) item;
long requestId = localValue.getKey();
PriorityQueue<Entry<Long, Float>> globalValue = buffer.computeIfAbsent(requestId, key -> new PriorityQueue<>(comparator.reversed()));
globalValue.addAll(asList(localValue.getValue()));
while (globalValue.size() > MAX_RESULTS) {
globalValue.remove();
}
int count = accumulatedCount.merge(requestId, 1, Integer::sum);
if (count == upstreamMemberCount) {
// we've received enough local values, let's emit and remove the accumulator
pendingItem = entry(requestId, globalValue.toArray(new Entry[0]));
Arrays.sort(pendingItem.getValue(), comparator);
buffer.remove(requestId);
accumulatedCount.remove(requestId);
} else {
return true;
}
}
if (tryEmit(pendingItem)) {
pendingItem = null;
return true;
}
return false;
}
}
You also need custom meta-supplier for CombineP
:
private static class CombineMetaSupplier implements ProcessorMetaSupplier {
private int upstreamMemberCount;
@Override
public void init(@Nonnull Context context) {
upstreamMemberCount = context.totalParallelism() / context.localParallelism();
}
@Nonnull
@Override
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
return address -> ProcessorSupplier.of(() -> new CombineP(upstreamMemberCount));
}
}
Upvotes: 1