duckertito
duckertito

Reputation: 3645

How to save RDD data into json files, not folders

I am receiving the streaming data myDStream (DStream[String]) that I want to save in S3 (basically, for this question, it doesn't matter where exactly do I want to save the outputs, but I am mentioning it just in case).

The following code works well, but it saves folders with the names like jsonFile-19-45-46.json, and then inside the folders it saves files _SUCCESS and part-00000.

Is it possible to save each RDD[String] (these are JSON strings) data into the JSON files, not the folders? I thought that repartition(1) had to make this trick, but it didn't.

    myDStream.foreachRDD { rdd => 
       // datetimeString = ....
       rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json")
    }

Upvotes: 3

Views: 10271

Answers (4)

cmantas
cmantas

Reputation: 1533

In case someone bumps into that, here's a similar answer for PySpark+Dataframes

from hdfs import InsecureClient
HDFS_HTTP_URL = "http://my-hdfs-namenode-addr.my-org.net"

def write_dataframe_as_single_csv(df, output_fname, overwrite=False, compression='gzip'):
    "Helper function for writing a DF as a single CSV file with specified name"
    df = df.coalesce(1) # make sure output has 1 partition
    hdfs_client = InsecureClient(HDFS_HTTP_URL)
    
    tmp_path = output_fname + "_tmp"
    spark_writer = df.write.mode("overwrite") if overwrite else df.write
    
    spark_writer.csv(tmp_path, compression=compression)
    
    parts = [f for f in hdfs_client.list(tmp_path) if f[:5] == 'part-']
    assert len(parts) == 1, "something was wrong and multiple partitions found in the output"
    part_f = parts[0]
    
    if overwrite:
        hdfs_client.delete(output_fname)  # In case the output filename exists
    hdfs_client.rename(tmp_path + "/" + part_f, output_fname)  # Rename the single CSV partition
    hdfs_client.delete(tmp_path, recursive=True)  # cleanup previous output dir

This uses the hdfs Python package. In this answer you can find a similar approach, using only pyspark bindings

Upvotes: 0

Anna Klein
Anna Klein

Reputation: 2171

For JAVA I implemented this one. Hope it helps:

    val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
    File dir = new File(System.getProperty("user.dir") + "/my.csv/");
    File[] files = dir.listFiles((d, name) -> name.endsWith(".csv"));
    fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv"));
    fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true);

Upvotes: 0

Mariusz
Mariusz

Reputation: 13946

As an alternative to rdd.collect.mkString("\n") you can use hadoop Filesystem library to cleanup output by moving part-00000 file into it's place. Below code works perfectly on local filesystem and HDFS, but I'm unable to test it with S3:

val outputPath = "path/to/some/file.json"
rdd.saveAsTextFile(outputPath + "-tmp")

import org.apache.hadoop.fs.Path
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath))
fs.delete(new Path(outputPath  + "-tmp"), true)

Upvotes: 1

mrsrinivas
mrsrinivas

Reputation: 35434

AFAIK there is no option to save it as a file. Because it's a distributed processing framework and it's not a good practice write on single file rather than each partition writes it's own files in the specified path.

We can pass only output directory where we wanted to save the data. OutputWriter will create file(s)(depends on partitions) inside specified path with part- file name prefix.

Upvotes: 2

Related Questions