Dev Loper
Dev Loper

Reputation: 229

Spark MapwithState stateSnapshots Not scaling (Java)

I am using spark to receive data from Kafka Stream to receive the status about IOT devices which are sending regular health updates and about state of the various sensors present in the devices . My Spark application listens to single topic to receive update messages from Kafka stream using Spark direct stream. I need to trigger different alarms based on the state of the sensors for each devices. However when I add more IOT devices which sends data to spark using Kakfa, Spark does not scale despite adding more number of machines and with number of executors increased . Below I have given the strip down version of my Spark application where notification triggering part removed with the same performance issues.

   // Method for update the Device state , it just a in memory object which tracks the device state  .
private static Optional<DeviceState> trackDeviceState(Time time, String key, Optional<ProtoBufEventUpdate> updateOpt,
            State<DeviceState> state) {
            int batchTime = toSeconds(time);
            ProtoBufEventUpdate eventUpdate = (updateOpt == null)?null:updateOpt.orNull();
            if(eventUpdate!=null)
                eventUpdate.setBatchTime(ProximityUtil.toSeconds(time));
            if (state!=null && state.exists()) {
                DeviceState deviceState = state.get();
                if (state.isTimingOut()) {
                    deviceState.markEnd(batchTime);
                }
                if (updateOpt.isPresent()) {
                        deviceState = DeviceState.updatedDeviceState(deviceState, eventUpdate);
                        state.update(deviceState);
                }
            } else if (updateOpt.isPresent()) {
                DeviceState deviceState = DeviceState.newDeviceState(eventUpdate);
                state.update(deviceState);              
                return Optional.of(deviceState);
            } 

        return Optional.absent();
}
    SparkConf conf = new SparkConf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
    .set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
     JavaStreamingContext context= new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put( “zookeeper.connect”, “192.168.60.20:2181,192.168.60.21:2181,192.168.60.22:2181”);
        kafkaParams.put("metadata.broker.list", “192.168.60.20:9092,192.168.60.21:9092,192.168.60.22:9092”);
        kafkaParams.put(“group.id”, “spark_iot”);
        HashSet<String> topics=new HashSet<>();
        topics.add(“iottopic”);

JavaPairInputDStream<String, ProtoBufEventUpdate> inputStream = KafkaUtils.
            createDirectStream(context, String.class, ProtoBufEventUpdate.class,  KafkaKryoCodec.class, ProtoBufEventUpdateCodec.class, kafkaParams, topics);

JavaPairDStream<String, ProtoBufEventUpdate> updatesStream = inputStream.mapPartitionsToPair(t -> {
            List<Tuple2<String, ProtoBufEventUpdate>> eventupdateList=new ArrayList<>();
            t.forEachRemaining(tuple->{
                    String key=tuple._1;
                    ProtoBufEventUpdate eventUpdate =tuple._2;                  
                    Util.mergeStateFromStats(eventUpdate);
                    eventupdateList.add(new Tuple2<String, ProtoBufEventUpdate>(key,eventUpdate));

            });
            return eventupdateList.iterator();
});

JavaMapWithStateDStream<String, ProtoBufEventUpdate, DeviceState, DeviceState> devceMapStream = null;

devceMapStream=updatesStream.mapWithState(StateSpec.function(Engine::trackDeviceState)
                             .numPartitions(20)
                             .timeout(Durations.seconds(1800)));
devceMapStream.checkpoint(new Duration(batchDuration*1000));


JavaPairDStream<String, DeviceState> deviceStateStream = devceMapStream
                .stateSnapshots()
                .cache();

deviceStateStream.foreachRDD(rdd->{
                if(rdd != null && !rdd.isEmpty()){
                    rdd.foreachPartition(tuple->{
                    tuple.forEachRemaining(t->{
                        SparkExecutorLog.error("Engine::getUpdates Tuple data  "+ t._2);
                    });
                });
                }
});

Even when the load is increasing I don't see the CPU usage increasing for Executor instances . Most of the time Executor instances CPU is idling. I tried increasing kakfa paritions (Currently Kafka is having 72 partitions. I did try to bring it down to 36 also) . Also I tried increasing devceMapStream partitions . but I couldn't see any performance improvements . The code is not spending any time on IO.

I am running our Spark Appication with 6 executor instances on Amazon EMR(Yarn) with each machine having 4 cores and 32 gb Ram. It tried to increate the number of executor instances to 9 then to 15, but didn't see any performance improvement. Also Played a bit around on spark.default.parallelism value by setting it 20, 36, 72, 100 , but I could see 20 was the one which gave me better performance (Maybe number of cores per executor has some influence on this) .

spark-submit --deploy-mode cluster --class com.ajay.Engine --supervise --driver-memory 5G --driver-cores 8 --executor-memory 4G --executor-cores 4 --conf spark.default.parallelism=20 --num-executors 36 --conf spark.dynamicAllocation.enabled=false --conf spark.streaming.unpersist=false --conf spark.eventLog.enabled=false --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties --conf spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError --conf spark.executor.extraJavaOptions=-XX:HeapDumpPath=/tmp --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.driver.extraJavaOptions=-XX:+UseG1GC --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties s3://test/engine.jar

At present Spark is struggling to complete the processing in 10 seconds (I have even tried different batch duration like 5, 10, 15 etc) . Its taking 15-23 seconds to complete one batch with input rate of 1600 records per seconds and having 17000 records for each batch. I need to use statesteam to check the state of the devices periodically to see whether the device is raising any alarms or any sensors have stopped responding. I am not sure how I can improve the performance my spark application ?

Upvotes: 4

Views: 534

Answers (1)

Utgarda
Utgarda

Reputation: 684

mapWithState does the following:

applying a function to every key-value element of this stream, while maintaining some state data for each unique key

as per its docs: PairDStreamFunctions#mapWithState

which also means that for every batch all the elements with the same key are processed in sequence, and, because the function in StateSpec is arbitrary and provided by us, with no state combiners defined, it can't be parallelized any further, no matter how you partition the data before mapWithState. I.e. when keys are diverse, parallelization will be good, but if all the RDD elements have just a few unique keys among them, then the whole batch will be mostly processed by just the number of cores equal to the number of unique keys.

In your case, keys come from Kafka:

            t.forEachRemaining(tuple->{
                String key=tuple._1;

and your code snippet doesn't show how they are generated.

From my experience, this is what may be happening: some part of your batches is getting quickly processed by multiple cores, and another part, having same key for a substantial part of the whole, takes more time and delays the batch, and that's why you see just a few tasks running most of the time, while there are under-loaded executors.

To see if it's true, check your keys distribution, how many elements are there for each key, can it be that just a couple of keys has 20% of all the elements? If this is true, you have these options:

  • change your keys generation algorithm
  • artificially split problematic keys before mapWithState and combine state snapshots later to make sense for the whole
  • cap the number of elements with the same key to be processed in each batch, either ignore elements after first N in every batch, or send them elsewhere, into some "can't process in time" Kafka stream and deal with them separately

Upvotes: 1

Related Questions