igor
igor

Reputation: 33

How to limit a Kotlin sequence with a predicate caring about the next element?

I need to address an API which returns chunked data.

val response = server.getChunk(request, chunkNumber)

data class PageInfo(
    val pageNumber: Int,
    val maxPages: Int
)
data class Response(
    val elements: List<Payload>,
    val pageInfo: PageInfo
)

I assume this API idempotent.

I can do something like that (take 1):

var i = 0
var maxPages: Int
val payloadTotal = mutableListOf<Payload>()
do {
    val response = server.getChunk(request, i++)
    maxPages = response.pageInfo.maxPages
    payloadTotal.addAll(response.payload)
} while (i < maxPages)
    

It works, but it's ugly.

take 2:

val partners = IntStream
    .iterate(0) { it + 1 }
    .asSequence()
    .map { server.getChunk(request, it) }
    .takeWhile { it.pageInfo.pageNumber < it.pageInfo.maxPages }
    .map { it.payload }
    .flatten()
    .toList()

It looks much nicer, but doesn't quite work as intended: since we check the takeWhile predicate after already requesting the server we will always end up doing one extra request. And since we don't know the maxPages value before we do the first request we can't use something like take(n: Int).

So I ended up going with that code (take 3):

var maxPages = Int.MAX_VALUE
val partners = IntStream
    .iterate(0) { it + 1 }
    .asSequence()
    .takeWhile { it < maxPages }
    .map {
        server.getChunk(request, it)
            .also { response -> maxPages = response.pageInfo.maxPages }
    }
    .map { it.payload }
    .flatten()
    .toList()
        

It works. But then again, we use this extra state which I do not like.

Also, because of the "effectively final" restrictions this won't work in Java at all (and I would like it to as a bonus).

So how do I achieve the required behavior with a functional approach? Is there a way to do it without tracking any addition state outside of the pipeline?

*If such a way exists does it also translate to Java?

Upvotes: 3

Views: 1135

Answers (3)

Joffrey
Joffrey

Reputation: 37680

In this case I would probably just make a sequence out of a do-while loop. It's quite readable for those who need to read the implementation, and quite useful for those who just care about the sequence:

fun Server.requestInPages(request: Request, startPage: Int = 0): Sequence<Response> = sequence {
    var page = startPage
    do {
        val response = getChunk(request, page++)
        yield(response)
    } while (response.pageInfo.pageNumber < response.pageInfo.maxPages)
}

And then use it like this:

val partners = server.requestInPages(request)
    .flatMap { it.elements }
    .toList()

If the responses are all the same, you could even add another helper that contains the flatMap on top of requestInPages. If nobody cares about the response structure and PageInfo at all, you could even make the flatMap part of requestInPages.

Bonus points: this could even be turned into a flow if your API call becomes a suspend function at some point (e.g. if you switch to coroutines).

Upvotes: 3

broot
broot

Reputation: 28322

We can create our own sequence operator for this purpose. It requires writing some code, but then we can use it very nicely:

generateSequence(0) { it + 1 }
    .map { server.getChunk(request, it) }
    .takeNextWhile { it.pageInfo.pageNumber + 1 < it.pageInfo.maxPages }
    .flatMap { it.elements }
    .toList()

fun <T> Sequence<T>.takeNextWhile(predicate: (T) -> Boolean): Sequence<T> = Sequence {
    val iter = iterator()
    var hasNext = iter.hasNext()
    object : Iterator<T> {
        override fun hasNext() = hasNext
        override fun next() = iter.next().also { hasNext = iter.hasNext() && predicate(it) }
    }
}

Alternatively, we can name it takeWhilePrevious() if that makes more sense to us.

If we don't want to create new operators, then I think it is better to not use functional transformations at all as they worsen the code readability in this case. Assuming maxPages does not change, we can first get it and then perform a very simple loop:

val first = server.getChunk(request, 0)
val payloads = first.elements.toMutableList()
(1 until first.pageInfo.maxPages).forEach {
    payloads += server.getChunk(request, it).elements
}

Upvotes: 4

sidgate
sidgate

Reputation: 15234

Little workaround to make it work, you can pass the null response object as seed to generateSequence

val partners = generateSequence(0 to (null as Response?)) { (page, _) ->
    page+1 to getChunk(page)
}.takeWhile { (page, response) ->
    response == null || page < response.maxPages
}.flatMap {
    it.second?.payload ?: emptyList()
}.toList()

(Readability goal is still not resolved though.) Some clarity can be introduced using functions

data class PageResponse(val page: Int = 0, val response: Response? = null) {
  fun isNotLastPage() = response == null || page < response.maxPages

  fun getPayload() = response?.payload ?: emptyList()
}

val partners = generateSequence(PageResponse()) { (page, _) ->
  PageResponse(page + 1, getChunk(page))
}
.takeWhile(PageResponse::isNotLastPage)
.flatMap(PageResponse::getPayload)
.toList()

Upvotes: 0

Related Questions