Anshul
Anshul

Reputation: 109

Incorrect file being produced using websockets in helidon

I am trying to upload a file using websockets in Helidon.I think i am doing it write the right way but the code seems to be flaky in terms of the size of the file produced which is different. The size of the file being produced is different for different runs. How can i make sure that the file size is same on both ends? I use a simple protocol for handshake[code below]:

Step1 client sends filesize=11000 buffer=5000
Step2 server sends SENDFILE
Step3 client >> buffer 1  server >> write 1 5000
Step4 client >> buffer 2  server >> write 2 5000
Step5 client >> buffer 3  server >> write 3 1000
Step6 client sends ENDOFFILE  server >> session.close

    //SERVER side  OnOpen session below
    session.addMessageHandler(new MessageHandler.Whole<String>() {
    @Override
    public void onMessage(String message) {
            System.out.println("Server >> " + message);
                if (message.contains("FILESIZE")) {
                    session.getBasicRemote().sendText("SENDFILENOW");
                }
                if(message.contains("ENDOFFILE")) {
                    System.out.println("Server >> FILE_SIZE=" + FILE_SIZE);
                    finalFileOutputStream.close();
                    session.close();
                }
        }
    });
    session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
        @Override
        public void onMessage(ByteBuffer b) {
                finalFileOutputStream.write(b.array(), 0, b.array().length);
                finalFileOutputStream.flush();
    
        }
    });
    
        
    //CLIENT  OnOpen session below
session.getBasicRemote().sendText("FILESIZE=" + FILE_SIZE);
    session.addMessageHandler(new MessageHandler.Whole<String>() {
        @Override
        public void onMessage(String message) {
            long M = FILE_SIZE / BUFFER_SIZE;
            long R = FILE_SIZE % BUFFER_SIZE;
    
            if(!message.equals("SENDFILENOW"))
                return;
            try {
                System.out.println("Starting File read ... " + path + "  " + FILE_SIZE + "  " + M + "  " +message );
                byte[] buffer = new byte[(int) BUFFER_SIZE];
    
                while (M > 0) {
                    fileInputStream.read(buffer);
                    ByteBuffer bytebuffer = ByteBuffer.wrap(buffer);
                    session.getBasicRemote().sendBinary(bytebuffer);
                    M--;
                }
                buffer = new byte[(int) R];
                fileInputStream.read(buffer, 0, (int) R);
                fileInputStream.close();
                ByteBuffer bytebuffer = ByteBuffer.wrap(buffer);
                session.getBasicRemote().sendBinary(bytebuffer);
                session.getBasicRemote().sendText("FILEREADDONE");
                session.close();
                f.complete(true);
            } catch (IOException e) {
                fail("Unexpected exception " + e);
            }
        }
    });
    

Upvotes: 0

Views: 84

Answers (1)

Daniel Kec
Daniel Kec

Reputation: 559

Your solution is unnecessarily built on top of several levels of abstraction just to use websockets. Do you really need that? Helidon is very well equipped to handle huge file upload directly and much more efficiently.

public class LargeUpload {
    
    public static void main(String[] args) {
        ExecutorService executor = ThreadPoolSupplier.create("upload-thread-pool").get();

        WebServer server = WebServer.builder(Routing.builder()
                        .post("/streamUpload", (req, res) -> req.content()
                                .map(DataChunk::data)
                                .flatMapIterable(Arrays::asList)
                                .to(IoMulti.writeToFile(createFile(req.queryParams().first("fileName").orElse("bigFile.mkv")))
                                        .executor(executor)
                                        .build())

                                .onError(res::send)
                                .onComplete(() -> {
                                    res.status(Http.Status.ACCEPTED_202);
                                    res.send();
                                }).ignoreElement())
                        .build())
                .port(8080)
                .build()
                .start()
                .await(Duration.ofSeconds(10));

        // Server started - do upload

        //several gigs file
        Path file = Path.of("/home/kec/helidon-kafka.mkv");


        try (FileInputStream fis = new FileInputStream(file.toFile())) {

            WebClient.builder()
                    .baseUri("http://localhost:8080")
                    .build()
                    .post()
                    .path("/streamUpload")
                    .queryParam("fileName", "bigFile_" + System.currentTimeMillis() + ".mkv")
                    .contentType(MediaType.APPLICATION_OCTET_STREAM)
                    .submit(IoMulti.multiFromByteChannelBuilder(fis.getChannel())
                            .bufferCapacity(1024 * 1024 * 4)
                            .build()
                            .map(DataChunk::create)
                    )
                    .await(Duration.ofMinutes(10));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        executor.shutdown();
        server.shutdown()
                .await(Duration.ofSeconds(10));
    }

    static Path createFile(String path) {
        try {
            Path filePath = Path.of("/home/kec/tmp/" + path);
            System.out.println("Creating " + filePath);
            return Files.createFile(filePath);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

Upvotes: 3

Related Questions