Dmitry Goldenberg
Dmitry Goldenberg

Reputation: 317

What is the recommended way to append to files on HDFS?

I'm having trouble figuring out a safe way to append to files in HDFS.

I'm using a small, 3-node Hadoop cluster (CDH v.5.3.9 to be specific). Our process is a data pipeliner which is multi-threaded (8 threads) and it has a stage which appends lines of delimited text to files in a dedicated directory on HDFS. I'm using locks to synchronize access of the threads to the buffered writers which append the data.

My first issue is deciding on the approach generally.

Approach A is to open the file, append to it, then close it for every line appended. This seems slow and would seem to create too many small blocks, or at least I see some such sentiment in various posts.

Approach B is to cache the writers but periodically refresh them to make sure the list of writers doesn't grow unbounded (currently, it's one writer per each input file processed by the pipeliner). This seems like a more efficient approach but I imagine having open streams over a period of time however controlled may be an issue, especially for output file readers (?)

Beyond this, my real issues are two. I am using the FileSystem Java Hadoop API to do the appending and am intermittently getting these 2 exceptions:

org.apache.hadoop.ipc.RemoteException: failed to create file /output/acme_20160524_1.txt for DFSClient_NONMAPREDUCE_271210261_1 for client XXX.XX.XXX.XX because current leaseholder is trying to recreate file.

org.apache.hadoop.ipc.RemoteException: BP-1999982165-XXX.XX.XXX.XX-1463070000410:blk_1073760252_54540 does not exist or is not under Constructionblk_1073760252_545 40{blockUCState=UNDER_RECOVERY, primaryNodeIndex=1, replicas=[ReplicaUnderConstruction[[DISK]DS-ccdf4e55-234b-4e17-955f-daaed1afdd92:NORMAL|RBW], ReplicaUnderConst ruction[[DISK]DS-1f66db61-759f-4c5d-bb3b-f78c260e338f:NORMAL|RBW]]}

Anyone have any ideas on either of those?

For the first problem, I've tried instrumenting logic discussed in this post but didn't seem to help.

I'm also interested in the role of the dfs.support.append property, if at all applicable.

My code for getting the file system:

userGroupInfo = UserGroupInformation.createRemoteUser("hdfs"); Configuration conf = new Configuration();
conf.set(key1, val1);
...
conf.set(keyN, valN);
fileSystem = userGroupInfo.doAs(new PrivilegedExceptionAction<FileSystem>() { 
  public FileSystem run() throws Exception { 
   return FileSystem.get(conf);
  }
});

My code for getting the OutputStream:

org.apache.hadoop.fs.path.Path file = ...
public OutputStream getOutputStream(boolean append) throws IOException {   
  OutputStream os = null;
  synchronized (file) { 
    if (isFile()) {
      os = (append) ? fs.append(file) : fs.create(file, true);
    } else if (append) {
      // Create the file first, to avoid "failed to append to non-existent file" exception
      FSDataOutputStream dos = fs.create(file);
      dos.close();
      // or, this can be: fs.createNewFile(file);
      os = fs.append(file);
    }
    // Creating a new file
    else { 
      os = fs.create(file);
    }
  }
  return os;
} 

Upvotes: 2

Views: 4008

Answers (1)

Dmitry Goldenberg
Dmitry Goldenberg

Reputation: 317

I got file appending working with CDH 5.3 / HDFS 2.5.0. My conclusions so far are as follows:

  • Cannot have one dedicated thread doing appends per file, or multiple threads writing to multiple files, whether we’re writing data via one and the same instance of the HDFS API FileSystem, or different instances.
  • Cannot refresh (i.e. close and reopen) the writers; they must stay open.
  • This last item leads to occasional relatively rare ClosedChannelException which appears to be recoverable (by retrying to append).
  • We use a single thread executor service with a blocking queue (one for appending to all files); a writer per file, the writers stay open (till the end of processing when they’re closed).
  • When we upgrade to CDH newer than 5.3, we’ll want to revisit this and see what threading strategy makes sense: one and only thread, one thread per file, multiple threads writing to multiple files. Additionally, we’ll want to see if writers can be/need to be periodically closed and reopened.
  • In addition, I have seen the following error as well, and was able to make it go away by setting 'dfs.client.block.write.replace-datanode-on-failure.policy' to 'NEVER' on the client side.
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010], original=[XXX.XX.XXX.XX:50010, XXX.XX.XXX.XX:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:969) ~[hadoop-hdfs-2.5.0.jar:?]
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1035) ~[hadoop-hdfs-2.5.0.jar:?]
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1184) ~[hadoop-hdfs-2.5.0.jar:?]
       at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:532) ~[hadoop-hdfs-2.5.0.jar:?] 

Upvotes: 2

Related Questions