Fashil
Fashil

Reputation: 185

how to put a for loop in sleep for a certain interval(scala)

I'm a newcomer to scala and I use the below code to get lines(rows) one by one from CSV(with 20 records) using for loop and I send those to Kafka

for (line <- FileStream.getLines) {
    val today = Calendar.getInstance.getTime
    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    val key = UUID.randomUUID().toString().split("-")(0)
    val value = formatter.format(today) + "," + line
    val data = new ProducerRecord[String, String](topic, key, value)
 
    println(data.value())
    
    producer.send(data)
}

What I need is after the loop reads 5 rows i.e, after it reaches 5 loops it should sleep for 5 secs and then it should continue from where it left(from 6th row).

The output I expect is:

2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM
2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM
2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM
2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM
2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM 
               
                  ------5 seconds sleep------
   
2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM
2012-08-21T00:00:00.000Z,93.98,94.1,92.99,93.21,2302988.0,MMM
2012-08-22T00:00:00.000Z,92.56,93.36,92.43,92.68,2463908.0,MMM
2012-08-23T00:00:00.000Z,92.65,92.68,91.79,91.98,1823757.0,MMM
2012-08-24T00:00:00.000Z,92.03,92.97,91.94,92.83,1945796.0,MMM

So how can we achieve that?

Upvotes: 2

Views: 1368

Answers (2)

Chema
Chema

Reputation: 2828

Though I personally don't like it, you could use a var variable that implies mutate state like the following example:

var count = 0
for (line <- FileStream.getLines) {

          val today = Calendar.getInstance.getTime
          val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

          val key = UUID.randomUUID().toString().split("-")(0)
          val value = formatter.format(today) + "," + line
          val data = new ProducerRecord[String, String](topic, key, value)

          println(data.value())
          producer.send(data)

          if(count % 5== 0) {
            println()
            println("***Sleeping for five seconds***")
            println()
            Thread.sleep(5000)
          }
}

Upvotes: 0

Alfilercio
Alfilercio

Reputation: 1118

You could transform this for loop to an iterator first, taking out of the loop common elements:

val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// be careful because SimpledateFormat is not threads safe

val iter: Iterator[Unit] = Iterator.continually {
  val line = FileStream.getLines
  val today = Calendar.getInstance.getTime
  val key = UUID.randomUUID().toString().split("-")(0)
  val value = formatter.format(today) + "," + line
  val data = new ProducerRecord[String, String](topic, key, value)
  println(data.value())
  producer.send(data)
}

Now you have an infinite iterator that consumes and sends messages, let's make it work in batches of 5 messages with the sliding method

val iter5 = iter.sliding(5,5)

This will make our iterator to work in groups of 5 messages, with no repetition. Now to put the sleep, after each batch, we will attach a sleep call.

val iterWithSleeps: Iterator[Unit] = iter5.flatMap(batch => batch :+ {Thread.sleep(5000})

flatMap will produce a flatten iterator, that every 5 messages, will execute a sleep of 5 seconds.

Upvotes: 1

Related Questions