IFH
IFH

Reputation: 161

Which Java file does Hadoop HDFS file splitting into Blocks

As we all know when a text file from local is being copied into the HDFS, the file is being split into a fixed size 128 MB. For example, when I copy a 256 MB text file into HDFS, there will be 2 blocks (256/128) that contain the "splitted" file.

Can Someone please tell me which java/jar file in the Hadoop 2.7.1 source code does the functionality of splitting the file into blocks and which java/jar file writes the blocks into the datanode's directory.

Help me trace this code.

I only found the one where they did the logical input splits for blocks which is found in the FileInputFormat.java and that is not what I need. I need the java file for the physical file being split.

Upvotes: 1

Views: 664

Answers (2)

Manjunath Ballur
Manjunath Ballur

Reputation: 6343

The code for writing data into DataNodes is present in 2 files:

  • DFSOutputStream.java (package: org.apache.hadoop.hdfs)

    The data written by client is split into packets (typically of 64k size). When a packet of data is ready, the data gets enqueued into a Data Queue, which is picked up by the DataStreamer.

  • DataStreamer (package: org.apache.hadoop.hdfs)

    It picks up the packets in the Data Queue and sends them to the Data Nodes in the pipeline (typically there are 3 Data Nodes in a data pipeline, because of replication factor of 3).

    It retrieves a new block ID and starts streaming the data to Data Nodes. When a block of data is written, it closes the current block and gets a new block for writing next set of packets.

    The code, where a new block is got, is below:

    // get new block from namenode.
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("Allocating new block");
      }
      setPipeline(nextBlockOutputStream());
      initDataStreaming();
    }
    

    The code, where the current block gets closed, is below:

    // Is this block full?
    if (one.isLastPacketInBlock()) {
      // wait for the close packet has been acked
      synchronized (dataQueue) {
        while (!shouldStop() && ackQueue.size() != 0) {
          dataQueue.wait(1000);// wait for acks to arrive from datanodes
        }
      }
      if (shouldStop()) {
        continue;
      }
    
      endBlock();
    }
    

    In the endBlock() method, again the stage is set to:

    stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    

    which means, a new pipeline is created for writing next set of packets to a new Block.

EDIT: How does the end of block is detected?

As DataStreamer keeps appending data to a block, it updates the number of bytes written.

/**
  * increase bytes of current block by len.
  *
  * @param len how many bytes to increase to current block
  */
void incBytesCurBlock(long len) {
    this.bytesCurBlock += len;
}

It also keeps checking, if the number of bytes written is equal to the blocksize:

// If packet is full, enqueue it for transmission
//
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
    getStreamer().getBytesCurBlock() == blockSize) {
  enqueueCurrentPacketFull();
}

In the statement above, following condition checks if the blocksize is reached:

getStreamer().getBytesCurBlock() == blockSize)

If the block boundary is encountered, then endBlock() method gets called:

/**
 * if encountering a block boundary, send an empty packet to
 * indicate the end of block and reset bytesCurBlock.
 *
 * @throws IOException
 */
protected void endBlock() throws IOException {
    if (getStreamer().getBytesCurBlock() == blockSize) {
      setCurrentPacketToEmpty();
      enqueueCurrentPacket();
      getStreamer().setBytesCurBlock(0);
      lastFlushOffset = 0;
    }
}

This will ensure that, the current block gets closed and a new block is obtained from Name Node for writing the data.

The block size is determined by dfs.blocksize parameter in hdfs-site.xml file (it is set to 128 MB = 134217728 in my cluster):

<property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
    <description>The default block size for new files, in bytes.
        You can use the following suffix (case insensitive): k(kilo),
        m(mega), g(giga), t(tera), p(peta), e(exa) to specify the
        size (such as 128k, 512m, 1g, etc.), Or provide complete size
        in bytes (such as 134217728 for 128 MB).
    </description>
</property>

Upvotes: 1

Shravanya
Shravanya

Reputation: 97

It is not a jar/java file which does the function of splitting the file. It is the client daemon which does this task. when you load a file from local, client reads only 128MB first, it finds a place to store it by asking namenode and also it makes sure that the file is copied and replicated properly. Client at this stage doesnot know the real size of file unless its going to read all the blocks in the same fashion.

The FileInputFormat.java which you have mentioned is not used by hdfs when you want to store a file. it is used when you want to run any mapreduce task on that file. it has nothing to do with storage of file.

Upvotes: 0

Related Questions