ek1984
ek1984

Reputation: 131

spring webflux Flux<DataBuffer> convert to InputStream

I'm currently working on Spring WebFlux. I'm trying to upload large file (70mo) using Spring WebFlux.

My Controller

@RequestMapping(method = RequestMethod.POST, consumes = MediaType.MULTIPART_FORM_DATA_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<String> uploadHandler(@RequestBody Flux<Part> fluxParts, @RequestParam(value = "categoryType") String categoryType, @PathVariable(value = "traceabilityReportUuid") String traceabilityUuid) {
    return documentHandler.upload(fluxParts, UUID.fromString(traceabilityUuid), categoryType);
}

My Service

public Flux<String> upload(Flux<Part> fluxParts, UUID traceabilityUuid, String categoryType) {

    return fluxParts
            .filter(part -> part instanceof FilePart)
            .ofType(FilePart.class)
            .flatMap(p -> this.upload(p, traceabilityUuid, categoryType));


}


private Mono<String> upload(FilePart filePart, UUID traceabilityUuid, String categoryType) {

    return filePart.content().collect(InputStreamCollector::new, (t, dataBuffer) -> t.collectInputStream(dataBuffer.asInputStream()))
            .flatMap(inputStreamCollector -> {
                upload(traceabilityUuid, inputStreamCollector.getInputStream(), filePart.filename(), categoryType);

                return Mono.just("OK");
            });
}

My Collector

public class InputStreamCollector {

    private InputStream is;

    public void collectInputStream(InputStream is) {
        if (this.is == null) this.is = is;
        this.is = new SequenceInputStream(this.is, is);
    }

    public InputStream getInputStream() {
        return this.is;
    }
}

And at the end, I retrieve the full inputstream by this way : inputStreamCollector.getInputStream() and pass to my object.

And I use this object in order to send to bucket S3.

But before sending to S3, I must to convert it into file (using apache tools), I have a stackoverflow exception.

java.lang.StackOverflowError: null
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)
at java.base/java.io.SequenceInputStream.read(SequenceInputStream.java:156)

it works fine with a small file (7mo ..)

Do you have an idea to resolve my issue please ?

Upvotes: 7

Views: 25927

Answers (4)

Archimedes Trajano
Archimedes Trajano

Reputation: 41300

Given a Flux<DataBuffer> fluxPublisher

final var publisher = fluxPublisher
  .map(dataBuffer -> dataBuffer.asInputStream(true))
  .reduce(SequenceInputStream::new);

Will give you a Mono<InputStream>.

The result must be closed after processing otherwise it will leak. The following is an example of some parsing using a Reader then closing

publisher
  .flatMap(is-> {
    try (var reader = new InputStreamReader(is)) {
      // parse
    } catch (IOException e) {
      return Mono.error(e);
    }
  });

Upvotes: 1

Milton BO
Milton BO

Reputation: 608

This example will help you to understand how to load data from FilePart:

public static Mono<String> readBase64Content(FilePart filePart) {
    return filePart.content().flatMap(dataBuffer -> {
         byte[] bytes = new byte[dataBuffer.readableByteCount()];
         dataBuffer.read(bytes);
         String content = Base64.getEncoder().encodeToString(bytes);
         return Mono.just(content);
    }).last();
}

Rest method

@PostMapping(value = "/person/{personId}/photo", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
Mono<String> uploadPhoto(@PathVariable Long personId, @RequestPart("photo") Mono<FilePart> photo) {
        return photo.ofType(FilePart.class).flatMap(StringUtil::readBase64Content);
}

Upvotes: 0

nekperu15739
nekperu15739

Reputation: 3698

To convert DataBuffer to String or List you could use Apache IOUtils. In this sample i'm returning a Flux, and to avoid try/catch i wrapped with Mono.fromCallable.

protected Flux<String> getLines(final DataBuffer dataBuffer) {
    return Mono.fromCallable(() -> IOUtils.readLines(dataBuffer.asInputStream(), Charsets.UTF_8))
        .flatMapMany(Flux::fromIterable);
}

Upvotes: 2

ek1984
ek1984

Reputation: 131

Finally I found the solution !

https://github.com/entzik/reactive-spring-boot-examples/blob/master/src/main/java/com/thekirschners/springbootsamples/reactiveupload/ReactiveUploadResource.java

I adapted the code in order to return an InputStream and it works fine with large files ;-)

Upvotes: 7

Related Questions