Cyber Avater
Cyber Avater

Reputation: 2147

How to get flow output from non flow fuction

I want flow output (return type Flow<T>) from a non-flow function (return typeT).

fun getTotalFiles(): Int 
// Say, This is a library function it'll return the number of files (Int) in that folder at that specific moment.

//And,
fun getAllFiles(): List<File> 
// Say, This is a library function it'll return all the files (List<File>) in that folder.

The files in that folder can and will change in the future.

Now, I want to constantly observe the output, so how do I implement it?

fun getFlowOfTotalFiles(): Flow<Int> =
// A wrapper function that converts the library function return type to an observable flow, Flow<Int>

//And,
fun getFlowOfAllFiles(): Flow<List<File>> =
// A wrapper function that converts the library function return type to an observable flow, Flow<List<File>>

Upvotes: 2

Views: 133

Answers (2)

Tenfour04
Tenfour04

Reputation: 93511

For specifically monitoring a directory for files, you can use WatchService and convert it to a flow with the flow builder. Something like this:

fun getDirectoryMonitorFlow(directory: String) = flow {
    FileSystems.getDefault().newWatchService().use { watchService ->
        while (true) {
            val watchKey = Path.of(directory).register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
            if (watchKey.pollEvents().isNotEmpty()) {
                emit(Unit)
            }
            yield() // give flow opportunity to be cancelled.
            if (!watchKey.reset()) {
                println("Directory became unreadable. Finishing flow.")
                break
            }
        }
    }
}
    .catch { println("Exception while monitoring directory.") }
    .flowOn(Dispatchers.IO)

And then your class might look like:

fun getFlowOfTotalFiles(): Flow<Int> = getFlowOfAllFiles()
    .map { it.size }
    .distinctUntilChanged()

fun getFlowOfAllFiles(): Flow<List<File>> = flow {
    emit(Unit) // so current state is always emitted
    emitAll(getDirectoryMonitorFlow(directory))
}
    .map {
        File(directory).listFiles()?.toList().orEmpty()
    }
    .flowOn(Dispatchers.IO)
    .distinctUntilChanged()

Although you might consider making the first flow a private SharedFlow so you aren't running multiple WatchServices to monitor the same directory concurrently.

Upvotes: 3

Sergio
Sergio

Reputation: 30585

I believe you need an infinite loop inside a flow builder, something like the following:

fun getFlowOfTotalFiles(): Flow<Int> = flow {
    while (true) {
        emit(getTotalFiles())
        // delays for 5 sec before next request and 
        // terminates the infinite cycle when a coroutine, 
        // that collects this Flow, is canceled
        delay(5000) 
    }
}

fun getAllFilesFlow(): Flow<List<File>> = flow {
    while (true) {
        emit(getAllFiles())
        delay(5000)
    }
}

Upvotes: 2

Related Questions