L_DisplayName
L_DisplayName

Reputation: 31

Understanding and Resolving File Transferring issue using Netty


Greetings All,
I need help resolving a file transferring issue. I implemented Netty code to transfer a single 10MB binary file from one host (Node 0) to another host (Node 1) but only 8.5KB of the file gets transferred and I am having a hard time figuring out why. I am using ChunkWriteHandler to send 1MB chunks of the file at a time via ChunkedNioFile (Please see code below). In addition, I tried transferring files greater than 1MB, such as 100MB, 500MB and 1GB and only 8.5KB of the file is transferred. If I reduce the chunk size specified in the ChunkedNioFile from 1MB to 512KB or lower, then 17 KB gets transferred which is double the size of previous file transfers. Also, I tried using just ChunkedFile but I received the same transfer results. I can successfully transfer and receive the file headers: file name, file size (length) and the file offset (of where to start reading from or writting to), but only a few KB of the actual file. Can someone tell me what's going on and how I can fix this problem? (Below is the code).

Thank You,

Code Set up:

FileSenderInitializer.java - Initializes the channel pipeline with the channel handlers

public class FileSenderInitializer extends ChannelInitializer {

        @Override
        public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(
        //new LengthFieldPrepender(8),
        new ChunkedWriteHandler(),
        new FileSenderHandler());
        }
       }

FileSenderHandler.java - Sends the file header info - File Name, offset, length and then the Actual File

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 try {
 String fileRequest = "ftp Node0/root/10MB_File.dat Node1/tmp/10MB_File_Copy.dat";

 //Source File to send / transfer to the Destination Node
 String theSrcFilePath =  "/root/10MB_File.dat";

 //File Name to write on the destination node, once the file is received  
 String theDestFilePath = "/tmp/10MB_File_Copy.dat";

//Get the source file to send
 File theFile = new File(theSrcFilePath);
 FileChannel theFileChannel = new RandomAccessFile(theFile, "r").getChannel();

//Get the length of the file
 long fileLength = theFileChannel.size();
 //Get the offset
 long offSet = 0;

 //Copy the offset to the ByteBuf
 ByteBuf offSetBuf = Unpooled.copyLong(offSet);
 //Copy the file length to the ByteBuf
 ByteBuf fileLengthBuf = Unpooled.copyLong(fileLength);

 //Get the Destination Filename (including the file path) in Bytes
 byte[] theDestFilePathInBytes = theDestFilePath.getBytes();
 //Get the length of theFilePath
 int theDestSize = theDestFilePathInBytes.length;
 //Copy the Dest File Path length to the ByteBuf
 ByteBuf theDestSizeBuf = Unpooled.copyInt(theDestSize);
 //Copy the theDestFilePathInBytes to the Byte Buf
 ByteBuf theDestFileBuf = Unpooled.copiedBuffer(theDestFilePathInBytes);

 //Send the file Headers: FileName Length, the FileName, the Offset and the file length
 ctx.write(theDestSizeBuf);
 ctx.write(theDestFileBuf);
 ctx.write(offSetBuf);
 ctx.write(fileLengthBuf);
 ctx.flush();

 //Send the 10MB File in 1MB chunks as specified by the following chunk size (1024*1024*1)
 ctx.write(new ChunkedNioFile(theFileChannel, offSet, fileLength, 1024 * 1024 * 1));
 ctx.flush();

 }catch(Exception e){
 System.err.printf("FileSenderHandler: Channel Active: Error: "+e.getMessage());
 e.printStackTrace();
 }
} //End channelActive

FileSender.java - Bootstraps the channel and connects this client/host to another host

  public static void main(String[] args) throws Exception {
     // Configure the client/ File Sender
     EventLoopGroup group = new NioEventLoopGroup();
     try {
     Bootstrap b = new Bootstrap();
     b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new FileSenderInitializer());

     // Start the client.
     ChannelFuture f = b.connect(HOST, PORT).sync();

     // Wait until the connection is closed.
     //f.channel().closeFuture().sync();
     } finally {
     // Shut down the event loop to terminate all threads.
     group.shutdownGracefully();
     }
     }
}

FileReceiverInitializer.java - Initializes the channel pipeline with the channel handlers

public class FileReceiverInitializer extends ChannelInitializer<SocketChannel> {

 public FileReceiverInitializer(){

 }

@Override
 public void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast( 
  //Read in 1MB data at a time (which is the max frame length), length field offset starts at 0, length of the length field is 8 bits, length adjustment is 0, strip the 8 bits representing the length field from the frame
 //new LengthFieldBasedFrameDecoder(1024*1024*1, 0, 8, 0, 8),
 new FileReceiverHandler());
 }
}

FileReceiverHandler.java - Receives the file header info - File Name, offset, length and then the actual file

public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
 while (msg.readableBytes() >= 1){
   //Read in the size of the File Name and it's directory path
   if (!fileNameStringSizeSet) {
     fileNameStringSizeBuf.writeBytes(msg, ((fileNameStringSizeBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringSizeBuf.writableBytes())); //INT_SIZE = 4 & LONG_SIZE = 8 (the byte size of an int and long)
     if (fileNameStringSizeBuf.readableBytes() >= INT_SIZE) {
       fileNameStringSize = fileNameStringSizeBuf.getInt(fileNameStringSizeBuf.readerIndex());//Get Size at index = 0;
       fileNameStringSizeSet = true;
   //Allocate a byteBuf to read in the actual file name and it's directory path
       fileNameStringBuf = ctx.alloc().buffer(fileNameStringSize);
    }
   } else if (!readInFileNameString) {
     //Read in the actual file name and it's corresponding directory path
     fileNameStringBuf.writeBytes(msg, ((fileNameStringBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringBuf.writableBytes()));
     if (fileNameStringBuf.readableBytes() >= fileNameStringSize) {
       readInFileNameString = true;
       //convert the data in the fileNameStringBuf to an ascii string
       thefileName = fileNameStringBuf.toString(Charset.forName("US-ASCII"));

       //Create file
       emptyFile = new File(thefileName); //file Name includes the directory path
       f = new RandomAccessFile(emptyFile, "rw");
       fc = f.getChannel();
    }
 }else if (!readInOffset) {
   offSetBuf.writeBytes(msg, ((offSetBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : offSetBuf.writableBytes()));
   if (offSetBuf.readableBytes() >= LONG_SIZE) {
     currentOffset = offSetBuf.getLong(offSetBuf.readerIndex());//Get Size at index = 0;
     readInOffset = true;
   }

 } else if (!readInFileLength) {
   fileLengthBuf.writeBytes(msg, ((fileLengthBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileLengthBuf.writableBytes()));
   //LONG_SIZE = 8
   if (fileLengthBuf.readableBytes() >= LONG_SIZE) {
   fileLength = fileLengthBuf.getLong(fileLengthBuf.readerIndex());//Get Size at index = 0;
   remainingFileLength = fileLength;
   readInFragmentLength = true;
  }
 } else {
   if (!readInCompleteFile) {
     if (msg.readableBytes() < remainingFileLength) {
       if (msg.readableBytes() > 0) {
         currentFileBytesWrote = 0
         while ( msg.readableBytes >= 1 ){
           int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset);
           currentOffset += fileBytesWrote;
           remainingFileLength -= fileBytesWrote;
           msg.readerIndex(msg.readerIndex + fileBytesWrote);
         }
       }
     } else {
       int remainingFileLengthInt = (int) remainingFileLength;
       while (remainingFileLength >= 1){
         int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), remainingFileLengthInt), currentOffset);

         currentOffset += fileBytesWrote;
         remainingFileLength -= fileBytesWrote;
         remainingFileLengthInt-= fileBytesWrote;
         msg.readerIndex(msg.readerIndex + fileBytesWrote );
       }

      //Set readInCompleteFile to true
      readInCompleteFile = true;

    }
   }//End else if file chunk
  }//End Else
 }//End While
}//End Read Method

FileReceiver.java - Bootstraps the Server and accepts connections

public static void main(String[] args) throws Exception {
 // Configure the server
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new FileReceiverInitializer())
 .childOption(ChannelOption.AUTO_READ, true) 
 .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
 } finally {
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
}

-- 

Upvotes: 0

Views: 730

Answers (3)

L_DisplayName
L_DisplayName

Reputation: 31

The problem was that when the main FileSender.java Application finished executing it's code it would terminate, thus causing the FileSenderHandler to terminate. However, to block the main FileSender.java Application from terminating I used the following statement: f.channel().closeFuture().sync();. in which f is a ChannelFuture rendered from connecting to the server via the call: b.connect(HOST, PORT).sync(); This would keep the FileSender up and allow the fileSenderHandler to send all information without terminating early.

However, my new questions is: How can the application close the channel and cause the main application to unblock once all data is sent and acknowledged? currently it is blocked from calling f.channel().closeFuture().sync();. but after I send all the data and receive acknowledgement, how can I unblock the main application. I thought if I closed the channel, the closeFuture would be returned as true, thus unblocking the main application. In addition, I tried closing the channel from within the FileSenderHandler and the FileReceiverHandler by using ctx.channel().close(), but the channel did not close and unblock the main application.

The reason I need to unblock the application is so I can print the throughput to the console after all data has been sent and acknowledged. if I have multiple data channels and the program is blocked only the first data channel handler throughput will be printed. So the FileSender.java would look as below. But even if I have one data Channel and I attempt to close the channel in FileSenderHandler, the main app (FileSender.java) still block and hangs on ChannelFuture.channel().closeFuture().sync(); To quit, I must type control C at the terminal. ANY IDEAS ON HOW I CAN UNBLOCK THE MAIN APP ONCE ALL DATA IS SENT AND RECEIVED?

FileSender.java - Bootstraps the channel and connects this client/host to another host

public static void main(String[] args) throws Exception {
 // Configure the client/ File Sender
 EventLoopGroup group = new NioEventLoopGroup();
 try {
for (int i =0; i<numOfDataChannels; i++) {
 Bootstrap b = new Bootstrap();
 b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new FileSenderInitializer());

 // Start the client.
 ChannelFuture f = b.connect(HOST, PORT).sync();

  addChannelFutureToList(f);
}

 // Wait until the connection is closed for each data channel, but also who can actually close the channel
 for ( ChannelFuture f: channelFutureList){
   f.channel().closeFuture().sync();
}

//When Channel is closed PRINT THROUGHPUT  OF ALL THE DATA CHANNELS
printThroughput();
 } finally {
 // Shut down the event loop to terminate all threads.
 group.shutdownGracefully();
 }
 }
}

FileSenderHandler.java - Handles I/O Channel events such as Read/Write

public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
try {
 .
 .
 //After received msg Ack, close the channel, this should unblock the main application (FileSender.java) since after closing the channel closeFuture will be fulfilled
 ctx.channel().close();

}catch(Exception e){
    System.err.printf("ChannelRead Error Msg: " + e.getMessage());
    e.printStackTrace();

}

Upvotes: 1

Frederic Br&#233;gier
Frederic Br&#233;gier

Reputation: 2216

Another reason maybe : client side, you shutdown immediately the group once connected . it could be the reason, since the client could "abort" the transfer therefore the server will not have the full transfer ?

Upvotes: 0

Frederic Br&#233;gier
Frederic Br&#233;gier

Reputation: 2216

Perhaps I'm wrong but the following is strange to me:

       int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset);
       currentOffset += fileBytesWrote;
       remainingFileLength -= fileBytesWrote;
       msg.readerIndex(msg.readerIndex + fileBytesWrote); 
       // msg.readerIndex (or msg.readerIndex() ?) changed already

You might want to backup readerIndex() value before doing this assignment.

Having a few KB seems related to either: - you do not consume all packets as you wanted to be (only the first one?) - you read it not correctly (skipping some bytes as I suspect in the code shown before)

Could you trace each read operation (server side) ? It might help you (knowing how much bytes you received, how much you wrote, what are the readerIndex/readableBytes/offset for instance).

Upvotes: 0

Related Questions