K2mil J33
K2mil J33

Reputation: 165

Can I use the same executor for subscribeOn method and async task

Hi I have a simple question, suppose that I have a class like below:

import lombok.Value;

import java.nio.file.Path;

@Value
class ImageResizeRequest {

    private DownloadedImage downloadedImage;

    private ImageSize imageSize;

    private Path destinationLocation;
}

Class above represent single task responsible for resize image to given size. I have a many requests to resize this image to many differents sizes.

@RequiredArgsConstructor
class ImageResizeService {

    private final Executor executor;

    Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {

        return Flux.fromIterable(requests)
                .flatMap(this::resize)
                .collectList()
                .subscribeOn(Schedulers.fromExecutor(executor));
    }

    private Mono<ImageResizeResult> resize(ImageResizeRequest request) {

        return Mono.fromFuture(CompletableFuture.supplyAsync(resizeTask(request), executor));

    }

    private Supplier<ImageResizeResult> resizeTask(ImageResizeRequest request) {
        return () -> {
            //TODO add image resize logic for example ImageMagick by Im4Java...
            /** code below call ImageMagick library
             ConvertCmd cmd = new ConvertCmd();
             IMOperation op = new IMOperation();
             op.quality(100d);
             op.addImage(request.getDestinationLocation().toString());
             cmd.run(op);

             */
            //TODO add logic!!!
            return new ImageResizeResult(null, null, null, null);
        };
    }
}

My question is: How to implement in Project Reactor parallel independent task responsible for resize image? Without Project reactor i would use List of CompletableFuture:

private static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    CompletableFuture<Void> allDoneFuture =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    return allDoneFuture.thenApply(v ->
            futures.stream().
                    map(future -> future.join()).
                    collect(Collectors.<T>toList())
    );
}

with specified executor service. Furthermore in my example I am using the same executor in subscribeOn method and in supplyAsync - is a good idea?

Upvotes: 0

Views: 1771

Answers (2)

K2mil J33
K2mil J33

Reputation: 165

So my all process looks like in the below:

@RequiredArgsConstructor
class ImageCommandProcessingService {

    private final DownloadRequestFactory downloadRequestFactory;
    private final ImageClientDownloader imageClientDownloader;
    private final ImageResizeRequestFactory imageResizeRequestFactory;
    private final ImageResizeService imageResizeService;

    Mono<List<ImageResizeResult>> process(ResizeImageCommand resizeImageCommand) {
        return Mono.just(resizeImageCommand)
                .map(command -> downloadRequestFactory.create(command.getImageUrl().getUrl()))
                .flatMap(imageClientDownloader::downloadImage)
                .map(downloadedImage -> imageResizeRequestFactory.createRequests(downloadedImage, resizeImageCommand.getSizes().toJavaList()))
                .flatMap(imageResizeService::resize);

    }

}

I have a command with url of image and set of sizes:

@Value
class ResizeImageCommand {

    private ImageUrl imageUrl;

    private Set<ImageSize> sizes;
}

Firstly i need to download image on disk, so I create a download request by factory:

@RequiredArgsConstructor
class DownloadRequestFactory {

    private final ImageLocationPathResolver resolver;

    DownloadRequest create(String url) {
        return new DownloadRequest(url, resolver.resolveDownloadedLocation(url));
    }
}

Resolver is a class responsible for create a Path to temporary file, and for create path for resized image:

class ImageLocationPathResolver {

    private String temporaryImagesFolder;
    private String destinationImagesFolder;

    Path resolveDownloadedLocation(String imageUrl) {
        LocalDateTime now = LocalDateTime.now();
        String fileName = now.toString() + "_" + getFileNameExtensionFromUrl(imageUrl);
        return Paths.get(temporaryImagesFolder,getDatePaths(now.toLocalDate()), fileName);
    }

    Path resolveDestinationLocation(ImageSize imageSize, String url) {
        String fileName = getFileNameExtensionFromUrl(url);
        return Paths.get(destinationImagesFolder, imageSize.getName(), getDatePaths(LocalDate.now()), fileName);
    }

    private String getFileNameExtensionFromUrl(String url) {
        return StringUtils.getFilenameExtension(url);
    }

    private String getDatePaths(LocalDate now) {
        return now.getYear() + File.pathSeparator + now.getMonth() + File.pathSeparator + now.getDayOfMonth();
    }
}

Farther I have a client responsible for download operation:

public interface ImageClientDownloader {

    Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest);
}

and implementation:

@Slf4j
class HttpImageClientDownloader implements ImageClientDownloader {

    private final WebClient webClient;

    HttpImageClientDownloader() {
        this.webClient = WebClient.create();
    }

    @Override
    public Mono<DownloadedImage> downloadImage(DownloadRequest downloadRequest) {
        try {
            Flux<DataBuffer> dataBuffer = webClient.get()
                    .uri(downloadRequest.getUrl())
                    .retrieve()
                    .bodyToFlux(DataBuffer.class);


            Path resultFilePath = Files.createFile(downloadRequest.getLocation());
            WritableByteChannel channel = Files.newByteChannel(resultFilePath, StandardOpenOption.WRITE);
            return DataBufferUtils.write(dataBuffer, channel)
                    .map(DataBufferUtils::release)
                    .then(Mono.just(new DownloadedImage(downloadRequest.getUrl(), resultFilePath, LocalDateTime.now())));

        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return Mono.error(e);
        }
    }
}

It is IO operation. Should i use dedicated scheduler? At the end I have resize operation, request is created inside map operation - imageResizeRequestFactory.

Upvotes: 0

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28351

Don't continuously re-create the Scheduler from the ExecutorService but strive to wrap it in the constructor directly.

You don't need the CompletableFuture at all, and subscribeOn should be applied to the inside of the flatMap to potentially pick separate threads per resize task (it picks one thread out of the pool per Flux it applies to):

class ImageResizeService {

  private final Executor executor; //TODO prefer an ExecutorService if possible
  private final Scheduler scheduler; //FIXME Schedulers.fromExecutor(executor)

  Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //for each request, perform asynchronous resize...
            .flatMap(r -> Mono
                //... by converting the resizeTask Callable to a Mono
                .fromCallable(r -> resizeTask(r).get())
                //... and making sure it executes on the executor
                .subscribeOn(scheduler)
            )
            .collectList();
  }
}

In order to achieve true parallelisation you have another option: parallel().runOn():

Mono<List<ImageResizeResult>> resize(List<ImageResizeRequest> requests) {
    //we get the requests on IO thread
    return Flux.fromIterable(requests)
            //divide into N workloads
            //the executor _should_ be capable of this degree of parallelisation:
            .parallel(NUMBER_OF_DESIRED_THREADS)
            //actually tell to run each workload on a thread picked from executor
            .runOn(scheduler) 
            //here the workload are already running on their dedicated thread,
            //we can afford to block it and thus apply resize in a simpler `map`
            .map(r -> resizeTask(r).get()) //NB: the Supplier aspect can probably be removed
            //go back to a `Flux` sequence for collection into list
            .sequential()
            .collectList();
}

Upvotes: 1

Related Questions