user3082806
user3082806

Reputation: 33

Moving Average in Spark Java

I have real time streaming data coming into spark and I would like to do a moving average forecasting on that time-series data. Is there any way to implement this using spark in Java?

I've already referred to : https://gist.github.com/samklr/27411098f04fc46dcd05/revisions and Apache Spark Moving Average but both these codes are written in Scala. Since I'm not familiar with Scala, I'm not able to judge if I'll find it useful or even convert the code to Java. Is there any direct implementation of forecasting in Spark Java?

Upvotes: 1

Views: 3093

Answers (1)

Victor
Victor

Reputation: 2546

I took the question you were referring and struggled for a couple of hours in order to translate the Scala code into Java:

// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDD<String> linesRDD = sc.textFile("some sample file containing stock prices");

// Convert the lines into our business objects
JavaRDD<StockQuotation> quotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation());

// We need these two objects in order to use the MLLib RDDFunctions object
ClassTag<StockQuotation> classTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDD<StockQuotation> rdd = JavaRDD.toRDD(quotationsRDD);

// Instantiate a RDDFunctions object to work with
RDDFunctions<StockQuotation> rddFs = RDDFunctions.fromRDD(rdd, classTag);

// This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDD<Date, Double> smaPerDate =     rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List<Tuple2<Date, Double>> smaPerDateList = smaPerDate.collect();

Then you have to use a new Function Class to do the actual calculation of each data window:

public class MovingAvgByDateFunction implements PairFunction<Object,Date,Double> {

/**
 * 
 */
private static final long serialVersionUID = 9220435667459839141L;

@Override
public Tuple2<Date, Double> call(Object t) throws Exception {

    StockQuotation[] stocks = (StockQuotation[]) t;
    List<StockQuotation> stockList = Arrays.asList(stocks);

    Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction<StockQuotation>() {

        @Override
        public double applyAsDouble(StockQuotation value) {
            return value.getValue();
        }
    }));

    result = result / stockList.size();

    return new Tuple2<Date, Double>(stockList.get(0).getTimestamp(),result);
}
}

If you want more detail on this, I wrote about Simple Moving Averages here: https://t.co/gmWltdANd3

Upvotes: 3

Related Questions