rakesh
rakesh

Reputation: 2051

How to get the file name from DStream of Spark StreamingContext?

Event after lots of try and googling, could not get the fileName, if I am use the streaming context. I can use the wholeTextFiles of SparkContext but, then I have to re-implement the streaming context's functionality.

Note: FileName (error events as json file) is the input to the system, so retaining the name in the output is extremely important so that any event can be traced during audit.

Note: FileName is of the format below. SerialNumber part can be extracted from the event json, but time is stored as milliseconds and difficult to get in below format in a reliable way and no way to find the counter. ...

Each file contains just one line as a complex json string. Using the streaming context I am able to create a RDD[String], where each string is a json string from a single file. Can any one have any solution/workaround for associating the strings with the respective file name.

val sc = new SparkContext("local[*]", "test")
val ssc = new StreamingContext(sc, Seconds(4))
val dStream = ssc.textFileStream(pathOfDirToStream)
dStream.foreachRDD { eventsRdd => /* How to get the file name */ }

Upvotes: 2

Views: 1868

Answers (2)

Sandhya
Sandhya

Reputation: 1

Hi to get file names from DStream I have created a java function which fetch file path using java spark api and than in spark-streaming(which is written in scala) i have called that function. Here is a java Code sample:

import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.NewHadoopPartition;
import org.apache.spark.rdd.UnionPartition;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.Partition;
public class GetFileNameFromStream implements Serializable{


   public String getFileName(Partition partition)
   {
       UnionPartition upp = (UnionPartition)partition;
       NewHadoopPartition npp = (NewHadoopPartition) upp.parentPartition();
       String filePath=npp.serializableHadoopSplit().value().toString();
      return filePath;
    }
 }

In spark streaming, i have called above java function Here is a code sample

val obj =new GetFileNameFromStream
dstream.transform(rdd=>{

   val lenPartition = rdd.partitions.length
   val listPartitions = rdd.partitions

   for(part <-listPartitions){
    var filePath=obj.getFileName(part)

 })

Upvotes: 0

Hamel Kothari
Hamel Kothari

Reputation: 737

You could do this using fileStream and creating your own FileInputFormat, similar to TextInputFormat which uses the InputSplit to provide the filename as a Key. Then you can use fileStream to get a DStream with filename and line.

Upvotes: 0

Related Questions