jay
jay

Reputation: 477

Read scala stream until criteria has come and gone

I'm reading a stream where I'm looking for json values in a certain time interval. Anything read, before and during values in the time interval, I want to cache. Once the stream has read all values in the time interval (assume chronological order), I should stop reading.

This code here caches all values in a mutable list and then filters to just json in the date range. But it doesn't stop reading once filter completes, it caches everything in the stream.

val cache: MutableList[JsValue] = MutableList.empty

pg.pins.flatten
  .map( record => {cache += record; record})
  .filter( js => ( js \ "created_at").as[Date].after(start) && ( js \ "created_at").as[Date].before(end) )

Any help is appreciated.

Upvotes: 0

Views: 245

Answers (1)

Dima
Dima

Reputation: 40500

You don't need mutable list for that (avoid using mutable structures unless there is a real reason why you have to). Assuming the pg.pins is a Stream and that the data is sorted by date (that your question seems to be implying), something like this should do what you want:

 val (after, before) = pg.pins.flatten
   .takeWhile { js => (js \ "createdAt").as[Date].before(end) }
   .partition { js => (js \ "createdAt").as[Date].after(start) }

Upvotes: 1

Related Questions