X09
X09

Reputation: 3956

How to manually call observer.onNext in rxJava

I am relatively new to RxJava/RxAndroid. I have been using AsyncTask to do my long running tasks before now. I have converted most of my AsyncTask to RxJava but this one. The particular problem I am having is calling something like AsyncTask's publishProgress(params); in the background thread. I need to do this to update the progress of a ProgressBar.

First this is the code in AsyncTask

private static class AddBooksToDatabase extends AsyncTask<String, String, String> {
    //dependencies removed

    AddBooksToDatabase(AddBooksDbParams params) {
        //Removed assignment codes
    }

    @Override
    protected String doInBackground(String... strings) {
        //Initializing custom SQLiteOpenHelper and SQLite database
        File mFile = new File(mFolderPath);

        int booksSize = getFilesInFolder(mFile).size();
        String[] sizeList = {String.valueOf(booksSize)};
        //The first publishProgress is used to set the max of the progressbar
        publishProgress(sizeList);

        for (int i = 0; i < booksSize; i++) {
            //publishProgress with current item, current file
            publishProgress(String.valueOf(i), getFilesInFolder(mFile).get(i).getName());
            //Inserting current items in database. Code removed
        }
        return null;
    }

    @Override
    protected void onPreExecute() {
        //Show ProgressBar
    }

    @Override
    protected void onPostExecute(String s) {
        //Hide ProgressBar
    }

    @Override
    protected void onProgressUpdate(String... values) {
        super.onProgressUpdate(values);
        if (values.length == 1) {
            //The first call to publishProgress
            mProgressBar.setMax(Integer.parseInt(values[0]));
        } else {
            //Subsequent calls to publish progress
            Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
            infoText.setText(values[1]);
            mProgressBar.setProgress(Integer.parseInt(values[0]), true);

        }
    }

    @Override
    protected void onCancelled() {
        cancel(true);
    }
}

Code Using RxJava

final Observable<String[]> addBooksObserver = Observable.create(new Observable.OnSubscribe<String[]>() {
        @Override
        public void call(Subscriber<? super String[]> subscriber) {
            subscriber.onNext(setAddSubscription());
            subscriber.onCompleted();
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

private String[] setAddSubscription() {
    S//Initializing custom SQLiteOpenHelper and SQLite database
    File mFile = new File(mFolderPath);

    int booksSize = getFilesInFolder(mFile).size();
    String[] sizeList = {String.valueOf(booksSize)};
    //The first publishProgress is used to set the max of the progressbar
    addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length), null, null));

    for (int i = 0; i < booksSize; i++) {
        EpubReader reader = new EpubReader();
        //publishProgress with current item, current file*
        addBooksObserver.doOnNext(addReturnParams(String.valueOf(sizeList.length),
                String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
        //Inserting current item in database. Code removed
    }
    return null;
}

private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
    return new String[]{totalItems, currentItem, currentFile};
}

The problem is that lines addBooksObserver.doOnNext(addReturnParams( are displaying this error doOnNext (rx.functions.Action1) cannot be applied to (java.lang.String[])

I don't know have any idea how to fix this because I thought that since setAddSubscription() and addReturnParams(String totalItems, String currentItem, String currentFile) are returning String array then this shouldn't be a problem. Please can you help me out?

Upvotes: 2

Views: 4715

Answers (4)

0xAliHn
0xAliHn

Reputation: 19240

You can use Subject to call onNext() manually like this:

 Subject<Event> event = Subject.create();

Now call the onNext() for sending event like:

 event.onNext("event");

Finally you can return Observable by using this code:

event.toFlowable(BackpressureStrategy.LATEST)
                .toObservable();

Upvotes: 1

Alex
Alex

Reputation: 1644

Observable.create(emitter -> {
            for (int i = 0; i < 10; i++) {
                int[] ii = {i, i * 2};
                emitter.onNext(ii);
            }
            emitter.onComplete();
        }).observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io()).subscribe(o -> {
            // update progress
            int[] i = (int[]) o;
            Toast.makeText(SearchActivity.this, "oftad " + i[0] + " - " + i[1], Toast.LENGTH_SHORT).show();

        }, t -> {
            // on error
            Toast.makeText(SearchActivity.this, t.getMessage(), Toast.LENGTH_SHORT).show();
        }, () -> {
            // progress tamom shod
            Toast.makeText(SearchActivity.this, "completed", Toast.LENGTH_SHORT).show();
        });

Upvotes: -1

Rich
Rich

Reputation: 1055

you just have to pass the values to the onNext method of your subscriber, not the doOnNext method of your observable!

you also have to subscribe to the service. try something like this for your obserable:

Observable.create(new Observable.OnSubscribe<String[]>() {
  @Override
  public void call(Subscriber<? super String[]> subscriber) {
    setAddSubscription(subscriber);
    subscriber.onCompleted();
  }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String[]>() {
  @Override
  public void onCompleted() {
   // handle 'oparation is done'
  }

  @Override
  public void onError(Throwable e) {

  }

  @Override
  public void onNext(String[] values) {
    if (values.length == 1) {
        //The first call to publishProgress
        mProgressBar.setMax(Integer.parseInt(values[0]));
    } else {
        //Subsequent calls to publish progress
        Log.i(TAG, "Current item is " + values[0] + " and current file is " + values[1]);
        infoText.setText(values[1]);
        mProgressBar.setProgress(Integer.parseInt(values[0]), true);

    }
  }
});

you also need to modify your private methods a little bit:

private void setAddSubscription(Subscriber<? super String[]> subscriber) {
  //Initializing custom SQLiteOpenHelper and SQLite database
  File mFile = new File(mFolderPath);

  int booksSize = getFilesInFolder(mFile).size();
  String[] sizeList = {String.valueOf(booksSize)};
  //The first publishProgress is used to set the max of the progressbar
  subscriber.onNext(addReturnParams(String.valueOf(sizeList.length), null, null));

  for (int i = 0; i < booksSize; i++) {
    EpubReader reader = new EpubReader();
    //publishProgress with current item, current file*
    subscriber.onNext(addReturnParams(String.valueOf(sizeList.length),
            String.valueOf(i), getFilesInFolder(mFile).get(i).getName()));
    //Inserting current item in database. Code removed
  }

}

private String[] addReturnParams(String totalItems, String currentItem, String currentFile) {
  return new String[]{totalItems, currentItem, currentFile};
}

Upvotes: 1

Kosh
Kosh

Reputation: 6334

Your Observer should be like Observable.create(new Observable.OnSubscribe<String>() & in your call method you should loop through the StringArray & pass it to onNext for example:

@Override
public void call(Subscriber<? super String> subscriber) {
   for(String val : setAddSubscription()) {
       subscriber.onNext(val);
   }
   subscriber.onCompleted();
}

now onNext shall return you individual items & onCompleted will be called upon the loop is finished.


Edit

myObserver.subscribe(new Subscriber<String>() {
  @Override
  public void onCompleted() {
   // handle completion. 
  }

  @Override
  public void onError(Throwable e) {

  }

  @Override
  public void onNext(String value) {
    // do whatever with each value passed to onNext
  }
});

Upvotes: 0

Related Questions