mogli
mogli

Reputation: 1609

Rolling Time Window Data in scala

Please find below simplified scala code snippet that generates random Day->Data mapping and attempts to calculate on rolling time window data for 3 days :-

val dataByDay: Map[String, String] = TreeMap((1 to 7).map(i => (s"Day$i" -> s"Data-$i")): _*)

val groupedIterator: Iterator[(Int, Map[String, String])] = dataByDay.sliding(3).zipWithIndex.map(e => ((e._2 + 1) -> e._1))

for ((day, lastFiveDaysDataOnEveryDay) <- groupedIterator) {
  println(s"On Day${day} data for days " + lastFiveDaysDataOnEveryDay.keys.mkString(",") + " will be used")
}

Output of above is :-

On Day1 data for days Day1,Day2,Day3 will be used
On Day2 data for days Day2,Day3,Day4 will be used
On Day3 data for days Day3,Day4,Day5 will be used
On Day4 data for days Day4,Day5,Day6 will be used
On Day5 data for days Day5,Day6,Day7 will be used

Requirement is to process data as shown below :-

On Day1 data for days will be used
On Day2 data for days Day1 will be used
On Day3 data for days Day1,Day2 will be used
On Day4 data for days Day1,Day2,Day3 will be used
On Day5 data for days Day2,Day3,Day4 will be used
On Day6 data for days Day3,Day4,Day5 will be used
On Day7 data for days Day4,Day5,Day6 will be used

Kindly suggest.

Upvotes: 0

Views: 642

Answers (2)

sarveshseri
sarveshseri

Reputation: 13985

I am assuming that this code is just for the purpose of this question and your actual requirement is a bit different.

I am providing a solution for streams and you can use something similar to following to get this special window implementation for your use case.

import scala.collection.mutable

val stream = {
  def loop(i: Int): Stream[(String, String)] = (s"Day$i", s"Data$i") #:: loop(i + 1)
  loop(1)
}

def specialWindowedStream[T](source: Stream[T], window: Int): Stream[List[T]] = {
  val queue = mutable.Queue.empty[T]
  def loop(source: Stream[T]): Stream[List[T]] = {
    queue.enqueue(source.head)
    if (queue.size > window) {
      queue.dequeue()
    }
    queue.toList #:: loop(source.tail)
  }

  loop(source)
}

val windowedStream = specialWindowedStream(stream, 5)

windowedStream.zipWithIndex.take(6).foreach(println)
// (List((Day1,Data1)),0)
// (List((Day1,Data1), (Day2,Data2)),1)
// (List((Day1,Data1), (Day2,Data2), (Day3,Data3)),2)
// (List((Day1,Data1), (Day2,Data2), (Day3,Data3), (Day4,Data4)),3)
// (List((Day1,Data1), (Day2,Data2), (Day3,Data3), (Day4,Data4),(Day5,Data5)),4)
// (List((Day2,Data2), (Day3,Data3), (Day4,Data4), (Day5,Data5),(Day6,Data6)),5)

Upvotes: 0

jwvh
jwvh

Reputation: 51271

Your requirements are a bit vague. If you just need that output then a simple solution is something like this.

(1 to 7).foreach { day =>
  val prior = Seq(day-3,day-2,day-1).filter(_>0).map("Day" + _)
  println(s"On Day$day data for days${prior.mkString(",")} will be used")
}

If the requirement is a data representation of a configurable rolling window then a little more precise information is needed.

Upvotes: 2

Related Questions