Sathish
Sathish

Reputation: 5173

Data skipped while writing Spark Streaming output to HDFS

I'm running a Spark Streaming application for every 10 seconds, its job is to consume data from kafka, transform it and store it into HDFS based on the key. i.e, a file per unique key. I'm using the Hadoop's saveAsHadoopFile() API to store the output, I see that a file gets generated for every unique key, but the issue is that only one row gets stored for each of the unique key though the DStream has more rows for the same key.

For example, consider the following DStream which has one unique key,

  key                  value
 =====   =====================================
 Key_1   183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0 
 Key_1   185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0 

I see only one row (instead of 5 rows) gets stored in the HDFS file,

185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0

The following code is used to store the output into HDFS,

dStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void> () {
    @Override
    public Void call(JavaPairRDD<String, String> pairRDD) throws Exception {
        long timestamp = System.currentTimeMillis();
        int randomInt = random.nextInt();
        pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" + timestamp +"-"+ randomInt, String.class, String.class, RDDMultipleTextOutputFormat.class);
    }
});

where the implementation of RDDMultipleTextOutputFormat is as follows,

public class RDDMultipleTextOutputFormat<K,V> extends    MultipleTextOutputFormat<K,V> {

    public K generateActualKey(K key, V value) { 
        return null;
    }

    public String generateFileNameForKeyValue(K key, V value, String name) { 
        return key.toString();
    }
}

Please let me know if I'm missing anything? Thanks for your help.

Upvotes: 0

Views: 692

Answers (1)

Kshitij Kulshrestha
Kshitij Kulshrestha

Reputation: 2072

Because the key is same, the value is getting replaced every time and hence you are getting the last value supplied to hadoop.

Upvotes: 1

Related Questions