Vale
Vale

Reputation: 1124

Writing a sequence file from an image in local to HDFS using Java and Spark

As the title says, that is my objective now.

The reason I'm using spark is for scalability (thousands of files to process and I will have a cluster of worker available) and because I'm thinking of implementing a SParkStreaming receiver on the image directory, so that the files will be processed automatically. Here is my initial code:

JavaPairRDD<String, String> imageRDD = jsc.wholeTextFiles("file:///home/cloudera/Pictures/");

    imageRDD.mapToPair(new PairFunction<Tuple2<String,String>, Text, Text>() {

        @Override
        public Tuple2<Text, Text> call(Tuple2<String, String> arg0)
                throws Exception {
            return new Tuple2<Text, Text>(new Text(arg0._1),new Text(arg0._2));
        }

    }).saveAsNewAPIHadoopFile("hdfs://localhost:8020/user/hdfs/sparkling/try.seq", Text.class, Text.class, SequenceFileOutputFormat.class);

Here I load an image as a text file and create a tuple with the Text type from the hadoop library. This works, but:

  1. The file isn't saved as a single one, but as a folder containing the partitions.
  2. It isn't an array of byte, but a text representation of the file. We all know how nagging it can be to reconvert from text to image (or whatever it is)
  3. If I load the files like this, Will there be a way to extract the required information?

I've tried to load the files as aasparkContext.binaryFiles(<directory>), but I'm always lost as how to extract info and on how to save them.
I can't seem to find the answer in the internet: does anybody of you know something about this?

Upvotes: 0

Views: 955

Answers (1)

Vale
Vale

Reputation: 1124

Here is how I did this:

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

            @Override
            public void call(
                    Iterator<Tuple2<String, PortableDataStream>> arg0)
                    throws Exception {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", HDFS_PATH);
                while(arg0.hasNext()){
                    Tuple2<String,PortableDataStream>fileTuple = arg0.next();
                    Text key = new Text(fileTuple._1());
                    String fileName = key.toString().split(SEP_PATH)[key.toString().split(SEP_PATH).length-1].split(DOT_REGEX)[0];
                    String fileExtension = fileName.split(DOT_REGEX)[fileName.split(DOT_REGEX).length-1];

                      BytesWritable value = new BytesWritable( fileTuple._2().toArray());
                         SequenceFile.Writer writer = SequenceFile.createWriter(
                                 conf, 
                                 SequenceFile.Writer.file(new Path(DEST_PATH + fileName + SEP_KEY + getCurrentTimeStamp()+DOT+fileExtension)),
                               SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new BZip2Codec()),
                               SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(BytesWritable.class));
                         key = new Text(key.toString().split(SEP_PATH)[key.toString().split(SEP_PATH).length-2] + SEP_KEY + fileName + SEP_KEY + fileExtension);
                            writer.append(key, value);
                         IOUtils.closeStream(writer);

                }
            }
        });

Upvotes: 1

Related Questions