Reputation: 1009
I have a HDFS directory with several files and I want to merge into one. I do not want to do this with Spark DFs but with HDFS interactions using data streams. Here is my code so far:
...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet')) # FSDataOutputStream
for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):
buffer = bytes(256)
in_stream = hdfs.open(f.getPath()) # FSDataInputStream
bytesRead = in_stream.read(buffer)
while (bytesRead > 0):
out_stream.writeBytes(bytesRead)
out_stream.flush()
in_stream.close()
out_stream.close()
First problem with this code is, that I am unsure on how to read the data from the input stream via buffer. First problem is, that the output file gets created in HDFS, but nothing gets written to it (even if I write fixed values to it).
Upvotes: 2
Views: 2394
Reputation: 1009
After some investigation, I found a solution for my problem. The solution involves, creating some JVM objects via spark context and use them for buffered i/o operations:
...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet')) # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)
for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):
raw_in = hdfs.open(f.getPath()) # FSDataInputStream
in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)
while in_stream.available() > 0:
out_stream.write(in_stream.read())
out_stream.flush()
in_stream.close()
out_stream.close()
Upvotes: 3