vetal_king
vetal_king

Reputation: 63

File is overwritten while using saveAsNewAPIHadoopFile

We are using Spark 1.4 for Spark Streaming. Kafka is data source for the Spark Stream.

Records are published on Kafka every second. Our requirement is to store records published on Kafka in a single folder per minute. The stream will read records every five seconds. For instance records published during 1200 PM and 1201PM are stored in folder "1200"; between 1201PM and 1202PM in folder "1201" and so on.

The code I wrote is as follows

//First Group records in RDD by date
stream.foreachRDD (rddWithinStream -> {
    JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = rddWithinStream.mapToPair(t -> {
    return new Tuple2<String, String> (targetHadoopFolder, t._2());
}).groupByKey();
// All records grouped by folders they will be stored in


// Create RDD for each target folder.
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) {
    JavaPairRDD <String, Iterable<String>> rddByKey = rddGroupedByDirectory.filter(groupedTuples -> {
    return groupedTuples._1().equals(hadoopFolder);
    });

// And store it in Hadoop 
    rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class);
}

Since the Stream processes data every five seconds, saveAsNewAPIHadoopFile gets invoked multiple times in a minute. This causes "Part-00000" file to be overwritten every time.

I was expecting that in the directory specified by "directory" parameter, saveAsNewAPIHadoopFile will keep creating part-0000N file even when I've a sinlge worker node.

Any help/alternatives are greatly appreciated.

Thanks.

Upvotes: 1

Views: 1462

Answers (2)

Ajay Ahuja
Ajay Ahuja

Reputation: 1313

You can try this -

Split process into 2 steps :

Step-1 :- Write Avro file using saveAsNewAPIHadoopFile to <temp-path>
Step-2 :- Move file from <temp-path> to <actual-target-path>

Hope this is helpful.

Upvotes: 0

vanekjar
vanekjar

Reputation: 2406

In this case you have to build your output path and filename by yourself. Incremental file naming works only when the output operation is called directly on DStream (not per each RDD).

The argument function in stream.foreachRDD can get Time information for each micro-batch. Referring to Spark documentation:

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit)

So you can save each RDD as follows:

stream.foreachRDD((rdd, time) -> {
    val directory = timeToDirName(prefix, time)
    rdd.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class);
})

Upvotes: 1

Related Questions