Sebastian Dine
Sebastian Dine

Reputation: 1009

PySpark HDFS data streams reading/writing

I have a HDFS directory with several files and I want to merge into one. I do not want to do this with Spark DFs but with HDFS interactions using data streams. Here is my code so far:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    buffer = bytes(256)
    in_stream = hdfs.open(f.getPath())  # FSDataInputStream

    bytesRead = in_stream.read(buffer)
    while (bytesRead > 0):
        out_stream.writeBytes(bytesRead)
        out_stream.flush()
    in_stream.close()

out_stream.close()

First problem with this code is, that I am unsure on how to read the data from the input stream via buffer. First problem is, that the output file gets created in HDFS, but nothing gets written to it (even if I write fixed values to it).

Upvotes: 2

Views: 2394

Answers (1)

Sebastian Dine
Sebastian Dine

Reputation: 1009

After some investigation, I found a solution for my problem. The solution involves, creating some JVM objects via spark context and use them for buffered i/o operations:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    raw_in = hdfs.open(f.getPath())  # FSDataInputStream
    in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)

    while in_stream.available() > 0:
        out_stream.write(in_stream.read()) 
        out_stream.flush()
    in_stream.close()
out_stream.close()

Upvotes: 3

Related Questions