Reputation: 3645
I am receiving the streaming data myDStream
(DStream[String]
) that I want to save in S3 (basically, for this question, it doesn't matter where exactly do I want to save the outputs, but I am mentioning it just in case).
The following code works well, but it saves folders with the names like jsonFile-19-45-46.json
, and then inside the folders it saves files _SUCCESS
and part-00000
.
Is it possible to save each RDD[String]
(these are JSON strings) data into the JSON files, not the folders? I thought that repartition(1)
had to make this trick, but it didn't.
myDStream.foreachRDD { rdd =>
// datetimeString = ....
rdd.repartition(1).saveAsTextFile("s3n://mybucket/keys/jsonFile-"+datetimeString+".json")
}
Upvotes: 3
Views: 10271
Reputation: 1533
In case someone bumps into that, here's a similar answer for PySpark+Dataframes
from hdfs import InsecureClient
HDFS_HTTP_URL = "http://my-hdfs-namenode-addr.my-org.net"
def write_dataframe_as_single_csv(df, output_fname, overwrite=False, compression='gzip'):
"Helper function for writing a DF as a single CSV file with specified name"
df = df.coalesce(1) # make sure output has 1 partition
hdfs_client = InsecureClient(HDFS_HTTP_URL)
tmp_path = output_fname + "_tmp"
spark_writer = df.write.mode("overwrite") if overwrite else df.write
spark_writer.csv(tmp_path, compression=compression)
parts = [f for f in hdfs_client.list(tmp_path) if f[:5] == 'part-']
assert len(parts) == 1, "something was wrong and multiple partitions found in the output"
part_f = parts[0]
if overwrite:
hdfs_client.delete(output_fname) # In case the output filename exists
hdfs_client.rename(tmp_path + "/" + part_f, output_fname) # Rename the single CSV partition
hdfs_client.delete(tmp_path, recursive=True) # cleanup previous output dir
This uses the hdfs Python package. In this answer you can find a similar approach, using only pyspark bindings
Upvotes: 0
Reputation: 2171
For JAVA I implemented this one. Hope it helps:
val fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
File dir = new File(System.getProperty("user.dir") + "/my.csv/");
File[] files = dir.listFiles((d, name) -> name.endsWith(".csv"));
fs.rename(new Path(files[0].toURI()), new Path(System.getProperty("user.dir") + "/csvDirectory/newData.csv"));
fs.delete(new Path(System.getProperty("user.dir") + "/my.csv/"), true);
Upvotes: 0
Reputation: 13946
As an alternative to rdd.collect.mkString("\n")
you can use hadoop Filesystem library to cleanup output by moving part-00000
file into it's place. Below code works perfectly on local filesystem and HDFS, but I'm unable to test it with S3:
val outputPath = "path/to/some/file.json"
rdd.saveAsTextFile(outputPath + "-tmp")
import org.apache.hadoop.fs.Path
val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.rename(new Path(outputPath + "-tmp/part-00000"), new Path(outputPath))
fs.delete(new Path(outputPath + "-tmp"), true)
Upvotes: 1
Reputation: 35434
AFAIK there is no option to save it as a file. Because it's a distributed processing framework and it's not a good practice write on single file rather than each partition writes it's own files in the specified path.
We can pass only output directory where we wanted to save the data. OutputWriter will create file(s)(depends on partitions) inside specified path with
part-
file name prefix.
Upvotes: 2