lserlohn
lserlohn

Reputation: 6216

efficient way to reformat/shift time series data using Spark

I want to build some time series models using spark. The first step is to reformat the sequence data into training samples. The idea is:

original sequential data (each t* is a number)

t1  t2  t3  t4  t5  t6  t7  t8  t9  t10

desired output

t1  t2  t3  t4  t5  t6
t2  t3  t4  t5  t6  t7
t3  t4  t5  t6  t7  t8
..................

how to write a function in spark to do this. The function signature should be like

reformat(Array[Integer], n: Integer)

return type is Dataframe or Vector

==========The code I tried on Spark 1.6.1 =========

val arraydata=Array[Double](1,2,3,4,5,6,7,8,9,10)
val slideddata = arraydata.sliding(4).toSeq
val rows = arraydata.sliding(4).map{x=>Row(x:_*)}
sc.parallelize(arraydata.sliding(4).toSeq).toDF("Values")

The final line can not go through with error:

Error:(52, 48) value toDF is not a member of org.apache.spark.rdd.RDD[Array[Double]]
    sc.parallelize(arraydata.sliding(4).toSeq).toDF("Values")

Upvotes: 0

Views: 1005

Answers (1)

Shivansh
Shivansh

Reputation: 3544

I was not able to figure out the significance of n as it can be used as the window size as well as the value with which it has to shift.

Hence there are both the flavours:

If n is the window size :

def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={
sc.parallelize(arrayOfInteger.sliding(shiftValue).toSeq).toDF("values")
}

On REPL:

scala> def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={
     | sc.parallelize(arrayOfInteger.sliding(shiftValue).toSeq).toDF("values")
     | }
reformat: (arrayOfInteger: Array[Int], shiftValue: Int)org.apache.spark.sql.DataFrame

scala> val arrayofInteger=(1 to 10).toArray
arrayofInteger: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> reformat(arrayofInteger,3).show
+----------+
|    values|
+----------+
| [1, 2, 3]|
| [2, 3, 4]|
| [3, 4, 5]|
| [4, 5, 6]|
| [5, 6, 7]|
| [6, 7, 8]|
| [7, 8, 9]|
|[8, 9, 10]|
+----------+

If n is the value to be shifted:

def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={
val slidingValue=arrayOfInteger.size-shiftValue
sc.parallelize(arrayOfInteger.sliding(slidingValue).toSeq).toDF("values")
}

On REPL:

scala> def reformat(arrayOfInteger:Array[Int], shiftValue: Int) ={
     | val slidingValue=arrayOfInteger.size-shiftValue
     | sc.parallelize(arrayOfInteger.sliding(slidingValue).toSeq).toDF("values")
     | }
reformat: (arrayOfInteger: Array[Int], shiftValue: Int)org.apache.spark.sql.DataFrame

scala> val arrayofInteger=(1 to 10).toArray
arrayofInteger: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> reformat(arrayofInteger,3).show(false)
+----------------------+
|values                |
+----------------------+
|[1, 2, 3, 4, 5, 6, 7] |
|[2, 3, 4, 5, 6, 7, 8] |
|[3, 4, 5, 6, 7, 8, 9] |
|[4, 5, 6, 7, 8, 9, 10]|
+----------------------+

Upvotes: 1

Related Questions