Reputation: 185
I'm a newcomer to scala
and I use the below code to get lines(rows) one by one from CSV
(with 20 records) using for loop and I send those to Kafka
for (line <- FileStream.getLines) {
val today = Calendar.getInstance.getTime
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val key = UUID.randomUUID().toString().split("-")(0)
val value = formatter.format(today) + "," + line
val data = new ProducerRecord[String, String](topic, key, value)
println(data.value())
producer.send(data)
}
What I need is after the loop
reads 5 rows i.e, after it reaches 5 loops
it should sleep
for 5 secs
and then it should continue from where it left(from 6th row).
The output I expect is:
2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM
2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM
2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM
2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM
2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM
------5 seconds sleep------
2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM
2012-08-21T00:00:00.000Z,93.98,94.1,92.99,93.21,2302988.0,MMM
2012-08-22T00:00:00.000Z,92.56,93.36,92.43,92.68,2463908.0,MMM
2012-08-23T00:00:00.000Z,92.65,92.68,91.79,91.98,1823757.0,MMM
2012-08-24T00:00:00.000Z,92.03,92.97,91.94,92.83,1945796.0,MMM
So how can we achieve that?
Upvotes: 2
Views: 1368
Reputation: 2828
Though I personally don't like it, you could use a var variable
that implies mutate state
like the following example:
var count = 0
for (line <- FileStream.getLines) {
val today = Calendar.getInstance.getTime
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val key = UUID.randomUUID().toString().split("-")(0)
val value = formatter.format(today) + "," + line
val data = new ProducerRecord[String, String](topic, key, value)
println(data.value())
producer.send(data)
if(count % 5== 0) {
println()
println("***Sleeping for five seconds***")
println()
Thread.sleep(5000)
}
}
Upvotes: 0
Reputation: 1118
You could transform this for loop to an iterator first, taking out of the loop common elements:
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// be careful because SimpledateFormat is not threads safe
val iter: Iterator[Unit] = Iterator.continually {
val line = FileStream.getLines
val today = Calendar.getInstance.getTime
val key = UUID.randomUUID().toString().split("-")(0)
val value = formatter.format(today) + "," + line
val data = new ProducerRecord[String, String](topic, key, value)
println(data.value())
producer.send(data)
}
Now you have an infinite iterator that consumes and sends messages, let's make it work in batches of 5 messages with the sliding method
val iter5 = iter.sliding(5,5)
This will make our iterator to work in groups of 5 messages, with no repetition. Now to put the sleep, after each batch, we will attach a sleep call.
val iterWithSleeps: Iterator[Unit] = iter5.flatMap(batch => batch :+ {Thread.sleep(5000})
flatMap
will produce a flatten iterator, that every 5 messages, will execute a sleep of 5 seconds.
Upvotes: 1