Amin Mousavi
Amin Mousavi

Reputation: 1290

Changing request and get a new data stream when using Paging3 library

I have used Jetpack's Paging 3 library in my project for handling data pagination. I have a use case which when user changes something in search request (for example adding/removing some filters), I have to call API and repopulate my list with new data based on the new search request. But when I create the new Pager instance and pass it to my PagingDataAdapter adapter, it throws:

java.lang.IllegalStateException: Collecting from multiple PagingData concurrently is an illegal operation.

My implementation is like this:


Repository

class Repository {
    fun getDataStream(request: Request): Flow<PagingData<Response>> {
        return Pager(
            config = PagingConfig(
                pageSize = 10,
                initialLoadSize = 10,
                prefetchDistance = 3
            ),
            initialKey = 1,
            pagingSourceFactory = {
                DataPagingSource(
                    request = request,
                    repository = this
                )
            }
        ).flow
    }

    fun getData(page: Int, request: Request): Result<Response> {
        return remoteDataSource.getData(page, request)
    }
}

DataPagingSource

class DataPagingSource(
    private val request: Request,
    private val repository: Repository
) : PagingSource<Int, Response>() {
    override suspend fun load(params: LoadParams<Int>): LoadResult<Int, Response> {
        val page = params.key ?: 1
        // Result is a sealed class which has two derived classes: Success and Error
        return when (val result = repository.getData(page, request)) {
            is Result.Success -> {
                LoadResult.Page(
                    data = result.data,
                    nextKey = page.inc(),
                    prevKey = null
                )
            }
            is Result.Error -> LoadResult.Error(
                result.error
            )
        }
    }
}

ViewModel

class SomeViewModel(
    private val repository: Repository
): ViewModel() {
    private val _currentRequest = MutableLiveData<Request>()
   
    val data = _currentRequest
        .switchMap {
            repository
                .getDataStream(it)
                .cachedIn(viewModelScope)
                .asLiveData()
        }

    fun updateRequest(request: Request) {
        _currentRequest.postValue(request)
    }
}

Fragment

class SomeFragment: Fragment() {
    private lateinit var viewModel: SomeViewModel

    // ...
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        // ...
        viewModel.data.observe(
            viewLifecycleOwner,
            Observer {
                lifecycleScope.launch {
                    adapter.submitData(it)
                }
            }
        )
    }
    // ...
}

It would be great if someone help fix this problem.
Thank you

Upvotes: 8

Views: 5372

Answers (3)

JackieHou
JackieHou

Reputation: 1

My implementation is like this:

DataPagingSource

class DataPagingSource(
    private val getRequest:() -> Request,
    private val repository: Repository
) : PagingSource<Int, Response>() {
    override suspend fun load(params: LoadParams<Int>): LoadResult<Int, Response> {
        val page = params.key ?: 1
        val request = getRequest()
        // Result is a sealed class which has two derived classes: Success and Error
        return when (val result = repository.getData(page, request)) {
            is Result.Success -> {
                LoadResult.Page(
                    data = result.data,
                    nextKey = page.inc(),
                    prevKey = null
                )
            }
            is Result.Error -> LoadResult.Error(
                result.error
            )
        }
    }
}

ViewModel

class SomeViewModel: ViewModel() {    
    
    private var _request :Request = DefaultRequest
    
    val data = Pager(
        config = PagingConfig(
            pageSize = 10,
            initialLoadSize = 10,
            prefetchDistance = 3
        ),
        initialKey = 1,
        pagingSourceFactory = {
            DataPagingSource(
                getRequest = {_request },
                repository = repository
            )
        }
    ).flow.cachedIn(viewModelScope)

    
    fun updateRequest(newRequest: Request) {
        if(_request != newRequest){
           _request = newRequest
        }
    }
}

Activity

viewModel.updateRequest(request)
pageAdapter.refresh()

Jatpack Compose

viewModel.updateRequest(request)
lazyPagingItems.refresh()

Upvotes: 0

NateisStalling
NateisStalling

Reputation: 73

I believe if you're observing your Pager as LiveData you need to use the adapter.submitData(lifecycle, data) method instead of adapter.submitData(data) in your Fragment, though I also recommend trying out the distinctUntilChanged() transformation on your _currentRequest LiveData to limit creating multiple PagingData objects from duplicate requests.

submitData(lifecycle: Lifecycle, pagingData: PagingData) Documentation

This method is typically used when observing a RxJava or LiveData stream produced by Pager. For Flow support, use the suspending overload of submitData, which automates cancellation via CoroutineScope instead of relying of Lifecycle

https://developer.android.com/reference/kotlin/androidx/paging/PagingDataAdapter#submitdata_1

Fragment

    viewModel.data.observe(
        viewLifecycleOwner,
        Observer {
            adapter.submitData(lifecycle, it)
        }
    )

ViewModel

    val data = _currentRequest
        // Limit duplicate Requests (Request class should implement equals())
        .distinctUntilChanged()  
        .switchMap {
        // ...
    }

Upvotes: 5

Amin Mousavi
Amin Mousavi

Reputation: 1290

I have found a solution to my problem. For Paging library to get data using new request model, You have to change request model and then call invalidate on your PagingDataSource. Here is an example:
In ViewModel the code changes like this:

class SomeViewModel: ViewModel() {    
    private var _dataPagingSource: DataPagingSource? = null
    private val _requestChannel = ConflatedBroadcastChannel<Request>()
    
    val data = Pager(
        config = PagingConfig(
            pageSize = 10,
            initialLoadSize = 10,
            prefetchDistance = 3
        ),
        initialKey = 1,
        pagingSourceFactory = {
            DataPagingSource(
                request = _requestChannel.value,
                repository = repository
            ).also {
                dataSource = it
            }
        }
    ).flow.cachedIn(viewModelScope).asLiveData()

    // ...
    
    // subscribe on requestChannel and invalidate dataSource each time
    // it emits new value
    init {
        _requestChannel
            .asFlow()
            .onEach { _dataPagingSource?.invalidate() }
            .launchIn(viewModelScope)
    }

    // call this method with the new request 
    fun updateRequest(newRequest: Request) {
        _requestChannel.send(newRequest)
    }
}

And Repository gets like this:

class Repository {
    // we do not need getDataStream method here anymore

    fun getData(page: Int, request: Request): Result<Response> {
        return remoteDataSource.getData(page, request)
    }
}

I do not know if there is any other way to do this. If you know of other ways it would be great to share it.

Upvotes: 4

Related Questions