Swaranga Sarma
Swaranga Sarma

Reputation: 13393

Implementing custom Spark RDD in Java

I have a custom data source and I want to load the data into my Spark cluster to perform some computations. For this I see that I might need to implement a new RDD for my data source.

I am a complete Scala noob and I am hoping that I can implement the RDD in Java itself. I looked around the internet and could not find any resources. Any pointers?

My data is in S3 and is indexed in Dynamo. For example, If I want to load data given a time range, I will first need to query Dynamo for the S3 file keys for the corresponding time range and then load them in Spark. The files may not always have the same S3 path prefix so sc.testFile("s3://directory_path/") won't work.

I am looking for pointers on how to implement something analogous to HadoopRDD or JdbcRDD but in Java. Something similar to what they have done here: DynamoDBRDD. This one reads data from Dynamo, my custom RDD would query DynamoDB for the S3 file keys, and then load them from S3.

Upvotes: 10

Views: 7681

Answers (2)

DanyalBurke
DanyalBurke

Reputation: 161

You can extend RDD in Java and implement the getPartitions and compute methods.

Java can extend Scala classes with some limitations.

Example:

package com.openmarket.danyal;
// Other imports left out
import org.apache.spark.Dependency;
import org.apache.spark.Partition;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;

import scala.collection.AbstractIterator;
import scala.collection.Iterator;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ClassManifestFactory$;
import scala.reflect.ClassTag;

public class AlphaTest {
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    public static void main(final String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Learn ABCs");
        try(JavaSparkContext sc = new JavaSparkContext(conf)) {
            System.out.println(new AlphabetRDD(sc.sc()).toJavaRDD().collect());
        }
    }

    public static class AlphabetRDD extends RDD<String> {
        private static final long serialVersionUID = 1L;

        public AlphabetRDD(SparkContext sc) {
            super(sc, new ArrayBuffer<Dependency<?>>(), STRING_TAG);
        }

        @Override
        public Iterator<String> compute(Partition arg0, TaskContext arg1) {
            AlphabetRangePartition p = (AlphabetRangePartition)arg0;
            return new CharacterIterator(p.from, p.to);
        }

        @Override
        public Partition[] getPartitions() {
            return new Partition[] {new AlphabetRangePartition(1, 'A', 'M'), new AlphabetRangePartition(2, 'P', 'Z')};
        }

    }

    /**
     * A partition representing letters of the Alphabet between a range
     */
    public static class AlphabetRangePartition implements Partition {
        private static final long serialVersionUID = 1L;
        private int index;
        private char from;
        private char to;

        public AlphabetRangePartition(int index, char c, char d) {
            this.index = index;
            this.from = c;
            this.to = d;
        }

        @Override
        public int index() {
            return index;
        }

        @Override
        public boolean equals(Object obj) {
            if(!(obj instanceof AlphabetRangePartition)) {
                return false;
            }
            return ((AlphabetRangePartition)obj).index != index;
        }

        @Override
        public int hashCode() {
            return index();
        }
    }

    /**
     * Iterators over all characters between two characters
     */
    public static class CharacterIterator extends AbstractIterator<String> {
        private char next;
        private char last;

        public CharacterIterator(char from, char to) {
            next = from;
            this.last = to;
        }

        @Override
        public boolean hasNext() {
            return next <= last;
        }

        @Override
        public String next() {
            // Post increments next after returning it
            return Character.toString(next++);
        }
    }
}

Upvotes: 13

Holden
Holden

Reputation: 7452

One option is reading the Hadoop specs, but if your data is structured Spark SQL has a new Data Sources API, some implementations which are posted on Spark Packages, including avro, redshift, and csv.

Upvotes: 1

Related Questions