Reputation: 5173
I'm running a Spark Streaming application for every 10 seconds, its job is to consume data from kafka, transform it and store it into HDFS based on the key. i.e, a file per unique key. I'm using the Hadoop's saveAsHadoopFile() API to store the output, I see that a file gets generated for every unique key, but the issue is that only one row gets stored for each of the unique key though the DStream has more rows for the same key.
For example, consider the following DStream which has one unique key,
key value
===== =====================================
Key_1 183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
Key_1 184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
Key_1 181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0
Key_1 185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
Key_1 185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
I see only one row (instead of 5 rows) gets stored in the HDFS file,
185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
The following code is used to store the output into HDFS,
dStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void> () {
@Override
public Void call(JavaPairRDD<String, String> pairRDD) throws Exception {
long timestamp = System.currentTimeMillis();
int randomInt = random.nextInt();
pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" + timestamp +"-"+ randomInt, String.class, String.class, RDDMultipleTextOutputFormat.class);
}
});
where the implementation of RDDMultipleTextOutputFormat is as follows,
public class RDDMultipleTextOutputFormat<K,V> extends MultipleTextOutputFormat<K,V> {
public K generateActualKey(K key, V value) {
return null;
}
public String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString();
}
}
Please let me know if I'm missing anything? Thanks for your help.
Upvotes: 0
Views: 692
Reputation: 2072
Because the key is same, the value is getting replaced every time and hence you are getting the last value supplied to hadoop.
Upvotes: 1