Reputation: 258
I am trying to write a while loop in a functional way in scala. What I want to do is populate a list with the messages from a queue (Kafka in this case but doesn't really matter).
I'm doing this for an integration test and since Kafka is running remotely when the tests are running in CI, the test fails some times because Kafka does not return any messages. So I wrote a loop that will query Kafka until I get back all the results I expect (otherwise the test will timeout after a while and fail). I have this right now:
var result = List[Int]()
while (result.size < expectedNumberOfMessages) {
result = result ++ kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator().toList.map(_.value.getPayload)
}
This works fine but it looks horrible to me. Plus if it was production code it would also be inefficient. Can anyone suggest a better way of doing this functionally?
Upvotes: 1
Views: 212
Reputation: 15086
Something like this perhaps?
def pollKafka = kafkaConsumer.poll(Duration.ofSeconds(10)).records(KAFKA_TOPIC).iterator.map(_.value.getPayload)
Iterator
.continually(pollKafka)
.flatten
.take(expectedNumberOfMessages)
.toList
Iterator
is internally mutable but if you use its high level functional interface and don't reuse an Iterator
it's perfectly fine IMHO.
If you want to go functional streams all the way down, you could consider a library like fs2.
Upvotes: 2
Reputation: 1316
If you plan on keeping the while
loop, I would first suggest you use a scala.collection.mutable.ListBuffer
instead of an immutable List
. This will prevent making copies of the whole list in memory on each iteration.
If you want a more "functional" way of writing the above code while keeping the Consumer API (instead of Kafka Streams API), you could manually define a scala Stream
like so:
import scala.util.Random
// mock Kafka's "poll", returns a random number of Ints (max 10)
def poll(): List[Int] = {
val size = Random.nextInt(10)
println("fetching messages")
Thread.sleep(1000)
(1 to size).map(_ => Random.nextInt(10)).toList
}
lazy val s: Stream[Int] = Stream.continually(poll()).flatten
// s is now a Stream that will be evaluated when a certain number of messages is requested
// for example, fetching 40 results:
/*
scala> s.take(40).toList
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
fetching messages
res0: List[Int] = List(3, 6, 2, 7, 7, 8, 0, 4, 6, 2, 0, 3, 8, 9, 5, 8, 2, 9, 2, 7, 9, 2, 6, 1, 6, 7, 2, 4, 4, 6, 6, 3, 5, 7, 2, 0, 9, 4, 9, 4)
*/
Upvotes: 2