Reputation: 411
I examine the Hazelcast Jet for my project needs, but I found the documentation really vague regarding the following topics:
1) When I perform a data join on two list streams...like for example:
BatchStage<Trade> trades = p.drawFrom(list("trades"));
BatchStage<Entry<Integer, Broker>> brokers =
p.drawFrom(list("brokers"));
BatchStage<Tuple2<Trade, Broker>> joined = trades.hashJoin(brokers,
joinMapEntries(Trade::brokerId),
Tuple2::tuple2);
joined.drainTo(Sinks.logger());
then can I somehow tell Jet what join underneath will actually occur? Either map-side join or reduce side join...? I mean just imagine the "brokers" set to be small and the trades set to be really huge. Optimal technique to perform the join of these two sets is map-side join aka broadcast join....What data will be transferred over network when Jet will do the join? Are there any size based optimization?
2) I was testing the following scenario:
easy pipeline:
private Pipeline createPipeLine() {
Pipeline p = Pipeline.create();
BatchStage stage = p.drawFrom(Sources.<Date>list("master"));
stage.drainTo(Sinks.logger());
return p;
}
list("master")
is being constantly filled by another node in the cluster. Now when I submit this pipeline to cluster, only subset of the list("master") is drained to logger. Can I somehow set the Jet job to be constantly draining the list("master")
to standard output?
Thanks in advance
Upvotes: 2
Views: 133
Reputation: 886
From Javadoc of HashJoin:
Implementationally, the hash-join transform is optimized for throughput so that each computing member has a local copy of all the enriching data, stored in hashtables (hence the name). The enriching streams are consumed in full before ingesting any data from the primary stream.
For your example, all the items from broker
list will be consumed first from all members then trades
list will be consumed.
IList
is a batch source, you need a streaming source to continuously consume the items. You can use IQueue
as a source, here is an easy way to create a source for a queue:
StreamSource<Trade> queueSource = SourceBuilder.<IQueue<Trade>>stream("queueStream",
c -> c.jetInstance().getHazelcastInstance().getQueue("trades"))
.<Trade>fillBufferFn((queue, buf) -> buf.add(queue.poll()))
.build();
Upvotes: 2