matfur92
matfur92

Reputation: 330

Wait until more conditions with RxJava on each external thread - do single operation across threads

I need to create this:

Desiderata diagram with RxJava

Now I have a working solution for the creation of new threads and the resume on the main thread when all signHash(..) are completed on new threads. [See working code at the end of the post]

As you can see in the diagram, what I must create is a way to manage, on new separated threads, the "wait and resume functionallity" of a portion of the method signHash(..) that now is simulated with a sleep.

In particular, after String hash = doStuff(..) that must be present on that position, I want to populate a bean like this:

import java.util.Hashtable;
import java.util.Map;

public class DocumentHashBucket {    
    private int numberNeededHashes;
    private boolean completedStoreHashes;
    private boolean completedStoreSignedHashes;
    private Map<String,byte[]> map = new Hashtable<>();
} 

in order to store the hash created by String hash = doStuff(..) and when only all threads have done theirs doStuff(..) reaching the numberNeededHashes I must do only one time on a single thread the call to a cloud service on internet that sign hashes. When the signing hash on cloud is completed I'll could change the map with signed hashes and permit the resume of the signHash(..) that will execute the doStuff2(signedHash); and, after, close the method/thread.

REQUIREMENT: It is mandatory to call doStuff(..) and doStuff2(signedHash) in that position and manage the single external cloud call between them.

QUESTION: How can I do it easily with RxJava?

Thanks a lot in advance

Working code that I'm using:

package com.example.unit;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executor;

import static org.junit.jupiter.api.Assertions.*;

public class UTestRxJava {

    @Test
    void uRxJavaSimple() {

        // Create an iterable observables
        List<Integer> calls = new LinkedList<>();
        calls.add(1);
        calls.add(2);
        calls.add(3);
        calls.add(4);

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
        System.out.println("Starting parallel executions");

        // Create an iterable observables
        List<Observable<Integer>> observables = new LinkedList<>();
        for (final Integer i: calls) {
            System.out.println("Adding... "+i);
            observables.add(Observable.fromCallable(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return signHash(i);
                }
            }).subscribeOn(Schedulers.newThread())); // subscribeOn => specify the Scheduler on which an Observable will operate
        }

        final Map<String,String> mapResults = new HashMap<>();

        Observable.zip(observables, new Function<Object[], Object>() {
            @Override
            public Object apply(Object[] objects) throws Throwable { // Zip observables
                System.out.println("apply()");
                List<String> observables = new LinkedList<>();
                for (Object obj:objects) {
                    System.out.println("Applying... "+obj.toString());
                    observables.add(obj.toString());
                }
                return observables;
            }
        })
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object results) throws Throwable {
                        System.out.println("Ending parallel executions");
                    }
                })
                .observeOn(Schedulers.from(new Executor() {
                    @Override
                    public void execute(Runnable runnable) {
                        tasks.add(runnable);// Add a scheduler with executor from the current thread
                    }}))
                .subscribe(new Consumer<Object>() {
                    // The Subscribe operator is the glue that connects an observer to an Observable
                    @Override
                    public void accept(Object results) throws Throwable { // Subscribe to the result.
                        // Put your code that needs to "wait"
                        for (String x : (List<String>)results) {
                            System.out.println("Results: "+x);
                            mapResults.put(x,"OK");
                        }
                    }
                });

        System.out.println("[START] TAKE-RUN");
        try {
            tasks.take().run();
        } catch (InterruptedException e) {e.printStackTrace();fail("it's not possible that there is an exception");}
        System.out.println("[END] TAKE-RUN");

        assertTrue(mapResults.size()==4);
    }

    private Integer signHash(Integer number) {
        String hash = doStuff(..)
        Integer result = number * number;
        System.out.println("Pre log \t"+Thread.currentThread().getName()+"\t"+ result);
        try {
            // TODO chage it with List<Hash> signHashesOnInternet(List<Hash>) only on one of all threads
            Thread.sleep(number * 1000);
        } catch (Exception e) {e.printStackTrace();}
        System.out.println("Post log \t"+Thread.currentThread().getName()+"\t"+ result);
        doStuff2(signedHash);
        return result;
    }

}

The output is:

Starting parallel executions
Adding... 1
Adding... 2
Adding... 3
Adding... 4
[START] TAKE-RUN
Pre log     RxNewThreadScheduler-1  1
Pre log     RxNewThreadScheduler-2  4
Pre log     RxNewThreadScheduler-3  9
Pre log     RxNewThreadScheduler-4  16
Post log    RxNewThreadScheduler-1  1
Post log    RxNewThreadScheduler-2  4
Post log    RxNewThreadScheduler-3  9
Post log    RxNewThreadScheduler-4  16
apply()
Applying... 1
Applying... 4
Applying... 9
Applying... 16
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
[END] TAKE-RUN


Process finished with exit code 0

Upvotes: 3

Views: 1001

Answers (2)

Janos Breuer
Janos Breuer

Reputation: 480

The following code is a possible solution.

It creates a PublishSubject that will consume the unsigned hashes after doStuff() on each thread.

Then it creates an observable by transforming the PublishSubject, which will emit the hashes signed by the API call.

This transformed observable is shared using replay(), and it is awaited blockingly in the signHash function so that all invocations of signHash will wait for the results of the same API call.

package com.example.unit;

import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.*;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;

public class UTestRxJava {

    @Test
    public void test() {

        List<Integer> calls = Arrays.asList(1, 2, 3, 4);

        PublishSubject<Map.Entry<String, byte[]>> hashSubject = PublishSubject.create();

        ConnectableObservable<Map<String, byte[]>> connectable = hashSubject
                .toSerialized() // This is needed because we will post to the subject from different threads
                .take(calls.size()) // Wait for all unsigned hashes to arrive
                .toMap(Map.Entry::getKey, Map.Entry::getValue) // combine all unsigned hashes to a single map
                .flatMapObservable( // use the combined map to create an observable by...
                        unsignedHashes -> Observable.fromCallable(
                                () -> apiCall(unsignedHashes)) // calling the synchronous api with the map
                                .subscribeOn(Schedulers.io()))  // the api will be performed on the io() scheduler
                .replay(); // all subscribers will get the same api result

        connectable.connect();

        Map<Integer, Integer> result = Observable.fromArray(calls.toArray(new Integer[0]))
                .flatMap((Integer i) ->
                        Observable.fromCallable(() -> new AbstractMap.SimpleImmutableEntry<>(i, signHash(i, hashSubject, connectable.firstOrError())))
                                .subscribeOn(Schedulers.newThread()))
                .toMap(Map.Entry::getKey, Map.Entry::getValue)
                .blockingGet();

        assertEquals(4, result.size());
        System.out.println(Thread.currentThread().getName() + " Result list: " + Arrays.toString(result.entrySet().toArray()));
    }

    private Map<String, byte[]> apiCall(Map<String, byte[]> unsignedHashes) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " Calling API with unsigned hashes: " + Arrays.toString(unsignedHashes.entrySet().toArray()));

        Map<String, byte[]> signedHashes = new HashMap<>();

        for (Map.Entry<String, byte[]> entry : unsignedHashes.entrySet()) {
            int value = Integer.parseInt(new String(entry.getValue()));
            signedHashes.put(entry.getKey(), ("" + (value * value)).getBytes());
        }
        Thread.sleep(1000);
        return signedHashes;
    }

    private Integer signHash(Integer number, PublishSubject<Map.Entry<String, byte[]>> hashSubject, Single<Map<String, byte[]>> hashesSignedByApi) {
        System.out.println(Thread.currentThread().getName() + " [START] doStuff()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " [END] doStuff()");

        byte[] numberStringInBytes = ("" + number).getBytes();
        String key = Base64.getEncoder().encodeToString(numberStringInBytes);

        hashSubject.onNext(new AbstractMap.SimpleImmutableEntry<>(key, numberStringInBytes));
        System.out.println(Thread.currentThread().getName() + " hashSubject.onNext with new hash : " + key);

        Map<String, byte[]> signedHashes = hashesSignedByApi.blockingGet();

        // check that the signing process is applied correctly
        assertTrue(Arrays.equals(signedHashes.get(key), ("" + (number * number)).getBytes()));

        System.out.println(Thread.currentThread().getName() + " [START] doStuff2()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " [END] doStuff2()");

        return Integer.valueOf(new String(signedHashes.get(key)));
    }
}

Upvotes: 0

matfur92
matfur92

Reputation: 330

Here what I found and tested. I don't know if this is the optimal solution but for me now it is working.

I used BehaviorSubject and his method blockingForEach in order to observe changes on the DocumentHashBucket (that is the bean that I created).

With this approach the blockingForEach allow me to have a blocking code that don't continue the execution of the rest of code until behaviorSubject.onComplete() is called.

package com.example.unit;

import com.example.DocumentHashBucket;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import org.apache.commons.codec.binary.Base64;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class UTestRxJavaStackOverflowSolution {

    @Test
    void uRxJavaSimple() {

        // Create an iterable observables
        List<Integer> calls = new LinkedList<>();
        calls.add(1);
        calls.add(2);
        calls.add(3);
        calls.add(4);

        // create the Observable that will be used for collect all hashes and store the signed version of each
        final DocumentHashBucket hashBucket = new DocumentHashBucket();
        hashBucket.setNumberDocuments(calls.size());
        final BehaviorSubject<DocumentHashBucket> behaviorSubject = BehaviorSubject.createDefault(hashBucket);

        // create the BlockingQueue in order to have a blocking point for the main thread
        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
        System.out.println("Starting parallel executions");

        // Create an iterable observables
        List<Observable<Integer>> observables = new LinkedList<>();
        for (final Integer i: calls) {
            System.out.println("Adding... "+i);
            observables.add(Observable.fromCallable(new Callable<Integer>() {
                @Override
                public Integer call(){
                    Integer res = signHash(behaviorSubject, i);
                    assertTrue(res==i*i,"The result must be the square of the variable");
                    return res;
                }
            }).subscribeOn(Schedulers.newThread())); // subscribeOn => specify the Scheduler on which an Observable will operate
        }

        final Map<String,String> mapResults = new HashMap<>();

        Observable.zip(observables, new Function<Object[], Object>() {
            @Override
            public Object apply(Object[] objects) throws Throwable { // Zip observables
                System.out.println("apply()");
                List<String> observables = new LinkedList<>();
                for (Object obj:objects) {
                    System.out.println("Applying... "+obj.toString());
                    observables.add(obj.toString());
                }
                return observables;
            }
        })
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object results){
                        System.out.println("Ending parallel executions");
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable){
                        System.err.println("Error on execution of "+Thread.currentThread().getName()+" : "+ throwable.getMessage());
                        throwable.printStackTrace();
                    }
                })
                .observeOn(Schedulers.from(new Executor() {
                    @Override
                    public void execute(Runnable runnable) {
                        tasks.add(runnable);// Add a scheduler with executor from the current thread
                    }}))
                .subscribe(new Consumer<Object>() {
                               // The Subscribe operator is the glue that connects an observer to an Observable
                               @Override
                               public void accept(Object onNext) { // Subscribe to the result.
                                   // Put your code that needs to "wait"
                                   for (String x : (List<String>) onNext) {
                                       System.out.println("Results: " + x);
                                       mapResults.put(x, "OK");
                                   }
                               }
                           },
                        new Consumer<Throwable>() {
                            // The Subscribe operator is the glue that connects an observer to an Observable
                            @Override
                            public void accept(Throwable onError) { // Subscribe to the result.
                                System.err.println("Error on execution in one thread detected on this (main) thread : " + onError.getMessage());
                                onError.printStackTrace();
                            }
                        },
                        new Action() {
                            @Override
                            public void run(){
                                System.out.println("onComplete");
                            }
                        });

        System.out.println("[START] TAKE-RUN");
        try {
            tasks.take().run();
        } catch (InterruptedException e) {
            System.err.println("Error on execution of zip: "+e.getMessage());
            e.printStackTrace();
            fail("it's not possible that there is an exception");
        }
        System.out.println("[END] TAKE-RUN");

        assertTrue(mapResults.size()==4);
    }

    private Integer signHash(final BehaviorSubject<DocumentHashBucket> behaviorSubject, Integer number) {
        System.out.println(Thread.currentThread().getName()+" [START] doStuff()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {e.printStackTrace();}
        System.out.println(Thread.currentThread().getName()+" [END] doStuff()");


        DocumentHashBucket hashBucket = behaviorSubject.getValue();
        byte[] numberStringInBytes = ("" + number).getBytes();
        String key = Base64.encodeBase64String(numberStringInBytes);
        hashBucket.getMap().put(key, numberStringInBytes);
        System.out.println(Thread.currentThread().getName()+" hashBucket.getMap().put new hash : "+key);

        behaviorSubject.blockingForEach(new Consumer<DocumentHashBucket>() {
            @Override
            public void accept(DocumentHashBucket documentHashBucket) throws Throwable {
                System.out.println(Thread.currentThread().getName()+" [blockingForEach] DocumentHashBucket is changed and now contains "+documentHashBucket.getMap().size()+" elements");
                synchronized (documentHashBucket){
                    if(documentHashBucket.getNumberDocuments()==documentHashBucket.getMap().size() && documentHashBucket.isCompletedStoreHashes()==false){

                        System.out.println(Thread.currentThread().getName()+"|> --------- all hashes arrived ---------");
                        // all hashes arrived
                        documentHashBucket.setCompletedStoreHashes(true);

                        System.out.println(Thread.currentThread().getName()+" [START] simulate signHashesOnInternet(..)");
                        try {
                            // simulate network call
                            Thread.sleep(10 * 1000);
                        } catch (Exception e) {e.printStackTrace();}
                        // simulate signing hash on the DocumentHashBucket
                        for(String key : documentHashBucket.getMap().keySet()){
                            int value = Integer.parseInt(new String(documentHashBucket.getMap().get(key)));
                            documentHashBucket.getMap().put(key,(""+(value*value)).getBytes());
                        }
                        System.out.println(Thread.currentThread().getName()+" [END] simulate signHashesOnInternet(..)");

                        // DocumentHashBucket - now hashes are signed
                        documentHashBucket.setCompletedStoreSignedHashes(true);

                        // unlock the blockingForEach
                        behaviorSubject.onComplete();

                        System.out.println(Thread.currentThread().getName()+"|> --------- all hashes SAVED ---------");
                    }else{
                        System.out.println(Thread.currentThread().getName()+"changed but hashes are: "+documentHashBucket.getMap().size());
                    }
                }// synchronized
            }
        });

        // check that the signing process is applied correctly
        assertTrue(Arrays.equals(hashBucket.getMap().get(key),(""+(number*number)).getBytes()));

        System.out.println(Thread.currentThread().getName()+" [START] doStuff2()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {e.printStackTrace();}
        System.out.println(Thread.currentThread().getName()+" [END] doStuff2()");

        assertTrue(hashBucket.getMap().size()==4,"The map must contains 4 elements");
        assertTrue(hashBucket.isCompletedStoreHashes(),"The flag completedStoreHashes must be true");
        assertTrue(hashBucket.isCompletedStoreSignedHashes(),"The flag completedStoreSignedHashes must be true");

        return number*number;
    }
}

Log of the execution:

Starting parallel executions
Adding... 1
Adding... 2
Adding... 3
Adding... 4
RxNewThreadScheduler-1 [START] doStuff()
RxNewThreadScheduler-2 [START] doStuff()
RxNewThreadScheduler-3 [START] doStuff()
[START] TAKE-RUN
RxNewThreadScheduler-4 [START] doStuff()
RxNewThreadScheduler-1 [END] doStuff()
RxNewThreadScheduler-1 hashBucket.getMap().put new hash : MQ==
RxNewThreadScheduler-1 [blockingForEach] DocumentHashBucket is changed and now contains 1 elements
RxNewThreadScheduler-1changed but hashes are: 1
RxNewThreadScheduler-2 [END] doStuff()
RxNewThreadScheduler-2 hashBucket.getMap().put new hash : Mg==
RxNewThreadScheduler-2 [blockingForEach] DocumentHashBucket is changed and now contains 2 elements
RxNewThreadScheduler-2changed but hashes are: 2
RxNewThreadScheduler-3 [END] doStuff()
RxNewThreadScheduler-3 hashBucket.getMap().put new hash : Mw==
RxNewThreadScheduler-3 [blockingForEach] DocumentHashBucket is changed and now contains 3 elements
RxNewThreadScheduler-3changed but hashes are: 3
RxNewThreadScheduler-4 [END] doStuff()
RxNewThreadScheduler-4 hashBucket.getMap().put new hash : NA==
RxNewThreadScheduler-4 [blockingForEach] DocumentHashBucket is changed and now contains 4 elements
RxNewThreadScheduler-4|> --------- all hashes arrived ---------
RxNewThreadScheduler-4 [START] simulate signHashesOnInternet(..)
RxNewThreadScheduler-4 [END] simulate signHashesOnInternet(..)
RxNewThreadScheduler-4|> --------- all hashes SAVED ---------
RxNewThreadScheduler-2 [START] doStuff2()
RxNewThreadScheduler-1 [START] doStuff2()
RxNewThreadScheduler-4 [START] doStuff2()
RxNewThreadScheduler-3 [START] doStuff2()
RxNewThreadScheduler-1 [END] doStuff2()
RxNewThreadScheduler-2 [END] doStuff2()
RxNewThreadScheduler-3 [END] doStuff2()
RxNewThreadScheduler-4 [END] doStuff2()
apply()
Applying... 1
Applying... 4
Applying... 9
Applying... 16
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
onComplete
[END] TAKE-RUN


Process finished with exit code 0

Upvotes: 1

Related Questions