andrei
andrei

Reputation: 369

Developing a spark streaming application

so the problem i'm trying to tackle is the following:

I'm curious as to how one would approach such a task using spark streaming.

My current implementation uses 3 types of components: a custom receiver and two classes that implement Function, one for the neural nets, one for the end aggregator.

In broad strokes, my application is built as follows:

JavaReceiverInputDStream<...> rndLists = jssc.receiverStream(new JavaRandomReceiver(...));

Function<JavaRDD<...>, Void> aggregator = new JavaSyncBarrier(numberOfNets);

for(int i = 0; i < numberOfNets; i++){
    rndLists.map(new NeuralNetMapper(neuralNetConfig)).foreachRDD(aggregator);
}

The main problem i'm having with this, though, is that it runs faster in local mode than when submitted to a 4-node cluster.

Is my implementation wrong to begin with or is something else happening here ?

There's also a full post here http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html with more details regarding the implementation of each of the three components mentioned previously.

Upvotes: 3

Views: 734

Answers (1)

smola
smola

Reputation: 883

It seems there might be a lot of repetitive instantiation and serialization of objects. The later might be hitting your performance in a cluster.

You should try instantiating your neural networks only once. You will have to ensure that they are serializable. You should use flatMap instead of multiple maps + union. Something along these lines:

// Initialize neural net first
List<NeuralNetMapper> neuralNetMappers = new ArrayList<>(numberOfNets);
for(int i = 0; i < numberOfNets; i++){
    neuralNetMappers.add(new NeuralNetMapper(neuralNetConfig));
}

// Then create a DStream applying all of them
JavaDStream<Result> neuralNetResults = rndLists.flatMap(new FlatMapFunction<Item, Result>() {
    @Override
    public Iterable<Result> call(Item item) {
        List<Result> results = new ArrayList<>(numberOfNets);
        for (int i = 0; i < numberOfNets; i++) {
            results.add(neuralNetMappers.get(i).doYourNeuralNetStuff(item));
        }
        return results;
    }
});

// The aggregation stuff
neuralNetResults.foreachRDD(aggregator);

If you can afford to initialize the networks this way, you can save quite a lot of time. Also, the union stuff you included in your linked posts seems unnecessary and is penalizing your performance: a flatMap will do.

Finally, in order to further tune your performance in the cluster, you can use the Kryo serializer.

Upvotes: 5

Related Questions