Stuck
Stuck

Reputation: 12302

How to stream filepart to database (r2dbc postgres)?

The question says it. I could not find any information on streaming byte arrays to a postgresql database in spring using r2dbc, e.g. for a file upload.

I can store the bytes, by reading all bytes of a file like this:

@PostMapping("/upload")
suspend fun upload(
  @RequestPart("file") filePartMono: Mono<FilePart>
): User {
  val filePart = filePartMono.awaitFirstOrNull() ?: throw UploadException("Missing file part")

  var inputStream = filePart.content().awaitFirst().asInputStream()

  val byteStream = ByteArrayOutputStream()
  filePart.content()
    .flatMap { dataBuffer -> Flux.just(dataBuffer.asByteBuffer().array()) }
    .collectList()
    .awaitFirst()
    .forEach { bytes -> byteStream.write(bytes) }

  val bytes = byteStream.toByteArray()

  fileRepository.save(File(bytes));
}

But I would like to stream filePart.content() to the database. I am also interested in then streaming a bytea from postgres through a controller to the client.

Upvotes: 3

Views: 891

Answers (1)

Hantsy
Hantsy

Reputation: 9321

In the readme of https://github.com/pgjdbc/r2dbc-postgresql, a Postgres bytea (it is the BLOB type in Postgres) can be mapped to byte[], ByteBuffer and R2dbc Blob automatically.

I tried to map bytea to the above 3 types in a Spring Boot 3/Spring Data R2dbc project.

The schema file:

CREATE TABLE IF NOT EXISTS posts (
    -- id SERIAL PRIMARY KEY,
    id UUID DEFAULT uuid_generate_v4(),
    title VARCHAR(255),
    content VARCHAR(255),
    metadata JSON default '{}',
    -- In this sample, use Varchar to store enum(name), Spring Data R2dbc can convert Java Enum to pg VARCHAR, and reverse.
    status VARCHAR(255) default 'DRAFT',
    created_at TIMESTAMP , --NOT NULL DEFAULT LOCALTIMESTAMP,
    updated_at TIMESTAMP,
    attachment bytea,
    cover_image bytea,
    cover_image_thumbnail bytea,
    version INTEGER,
    PRIMARY KEY (id)
);

And the mapped the Entity class.

@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "posts")
class Post {

    @Id
    @Column("id")
    private UUID id;

    @Column("title")
    private String title;

    @Column("content")
    private String content;

    @Column("metadata")
    private Json metadata;

    @Column("status")
    private Status status;

    @Column("attachment")
    private ByteBuffer attachment;

    @Column("cover_image")
    private byte[] coverImage;

    @Column("cover_image_thumbnail")
    private Blob coverImageThumbnail;

    @Column("created_at")
    @CreatedDate
    private LocalDateTime createdAt;

    @Column("updated_at")
    @LastModifiedDate
    private LocalDateTime updatedAt;

    @Column("version")
    @Version
    private Long version;

    enum Status {
        DRAFT, PENDING_MODERATION, PUBLISHED;
    }

}

Unfortunately only the byte[] type field worked well. See Spring Data relational issue #1408.

If you want to use other types(ByteBuffer an R2dbc Blob) at the moment, it requires extra custom converters.

@Configuration
public class DataR2dbcConfig {

    // see: https://github.com/spring-projects/spring-data-relational/issues/1408
    @Bean
    public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
        return R2dbcCustomConversions.of(
                DialectResolver.getDialect(connectionFactory),
                List.of(
                        new ByteArrayToByteBufferConverter(),
                        new ByteBufferToByteArrayConverter(),
                        new ByteArrayToBlobConverter(),
                        new BlobToByteArrayConverter()
                )
        );
    }
}


@ReadingConverter
class ByteArrayToByteBufferConverter implements Converter<byte[], ByteBuffer> {

    @Override
    public ByteBuffer convert(byte[] source) {
        return ByteBuffer.wrap(source);
    }
}

@WritingConverter
class ByteBufferToByteArrayConverter implements Converter<ByteBuffer, byte[]> {

    @Override
    public byte[] convert(ByteBuffer source) {
        return source.array();
    }
}

@ReadingConverter
class ByteArrayToBlobConverter implements Converter<byte[], Blob> {

    @Override
    public Blob convert(byte[] source) {
        return Blob.from(Mono.just(ByteBuffer.wrap(source)));
    }
}

@WritingConverter
class BlobToByteArrayConverter implements Converter<Blob, byte[]> {

    @Override
    public byte[] convert(Blob source) {
        return Mono.from(source.stream()).block().array();
    }
}

Import the config in PostRepositoryTest, that all of these types are working now.

In the example project, there is a PostController, which includes a uploading/downloading example endpoint using ByteBuffer.

    @PutMapping("{id}/attachment")
    public Mono<ResponseEntity<?>> upload(@PathVariable UUID id,
                                       @RequestPart Mono<FilePart> fileParts) {

        return Mono
                .zip(objects -> {
                            var post = (Post)objects[0];
                            var filePart = (DataBuffer)objects[1];
                            post.setAttachment(filePart.toByteBuffer());
                            return post;
                        },
                        this.posts.findById(id),
                        fileParts.flatMap(filePart -> DataBufferUtils.join(filePart.content()))
                )
                .flatMap(this.posts::save)
                .map(saved -> ResponseEntity.noContent().build());

    }


    @GetMapping("{id}/attachment")
    public Mono<Void> read(@PathVariable UUID id, ServerWebExchange exchange) {
        return this.posts.findById(id)
                .log()
                .map(post -> Mono.just(new DefaultDataBufferFactory().wrap(post.getAttachment())))
                .flatMap(r -> exchange.getResponse().writeWith(r));
    }

Update, added integration tests for the file uploading and downloading.

Update, spring-data-relational issue #1408 is resolved, we will not need the above converters when you update to use the latest versions.

Upvotes: 2

Related Questions