Reputation: 13632
I have a flink job which writes data to a destination using a TextOutputFormat. The code is like this:
String basePath = "/Users/me/out";
// String basePath = "hdfs://10.199.200.204:9000/data";
// ensure we have a format for this.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(GlobalConfiguration.getConfiguration());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// then serialize and write.
String record = serializationFunction.map(value);
log.info("Writing " + record);
format.writeRecord(record);
This works perfectly fine when using a path on the normal file system as destination. However when I change the base path to a hdfs location, it doesn't work as expected anymore. What happens is, that the output file is actually created on the HDFS however it has a size of zero bytes. I'm not getting any exceptions during the call.
I'm using Hadoop 2.6.0 and Flink 0.10.1. Copying files to hdfs using the command line tools (hadoop fs -put ...
) works, so I think I can rule out some Hadoop misconfiguration. Also I started Wireshark and saw the data being transmitted to the Hadoop server, so do I need to commit it somewhow before it is actually written?
Upvotes: 3
Views: 1080
Reputation: 13632
I found out why it happened. There are actually two reasons:
The output format wasn't flushed, as Till Rohrmann pointed out. Since I use the format in a streaming job, closing the format was no option. I resorted to writing my own format which can be flushed:
public class MyTextOutputFormat<T> extends TextOutputFormat<T> {
public MyTextOutputFormat(Path outputPath) {
super(outputPath);
}
public MyTextOutputFormat(Path outputPath, String charset) {
super(outputPath, charset);
}
// added a custom flush method here.
public void flush() throws IOException {
stream.flush();
}
}
I run HDFS in a VM guest and connect to it from the VM host. Flink's HDFS client uses the datanode's IP address by default to connect to the data node. However the datanode's IP address was reported as 127.0.0.1
. So flink tried to connect to 127.0.0.1
and of course no HDFS datanode was running there in the host system. This however only showed after I added the manual flush operation. To fix this, I had to change two things:
Inside the VM guest, modify $HADOOP_HOME/etc/hadoop/hdfs-site.xml
and add
<property>
<name>dfs.datanode.hostname</name>
<value>10.199.200.204</value> <!-- IP of my VM guest -->
</property>
This change made the namenode report the correct routable host name of the datanode. It is actually an undocumented setting, but seems to work.
On the system where flink actually runs, I had to create a hdfs-site.xml
in a folder (e.g. /home/me/conf
) and then had to set an environment variable HADOOP_CONF_DIR
pointing to /home/me/conf
. The file had the following contents:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>
This change instructed the hadoop client to use the host name instead of the ip address to connect to the datanode. After these changes my data was correctly written to HDFS.
Upvotes: 0
Reputation: 13346
In order to flush out the results to HDFS you have to call the close
method of the TextOutputFormat
after you've finished writing the records.
// do writing
while (some condition) {
format.writeRecord(record);
}
// finished writing
format.close();
Upvotes: 2