Kadri
Kadri

Reputation: 51

Renaming output file using Spark tool within Google Data Fusion

I have a pipeline in Google Data Fusion which produces a CSV file with the name "part-00000-XXXXXX" (and a file called "_SUCCESS") in a target directory in a Google Cloud bucket. The rest of the file name after "part-00000" is always different and random.

enter image description here

The pipeline produces the new output by parsing, processing, and joining input files together (all sourced from some Google Cloud Storage locations), then concatenates that new output with the older existing output file, and spits out the "part-00000" file in the same location as the older output file with name "internal_dashboard.csv".

By any means available, what I need to do is somehow manually rename the "part-00000" file to "internal_dashboard.csv" and replace the older file.

The following have been my attempts written in the Spark Sink (and I got them from here, here, here, here, and here). The idea was to first find a file with "part-00000" in the filename, and then rename it and overwrite the old file. All my attempts have failed so far:

import java.nio.file.{Files, Paths, StandardCopyOption}
import scala.util.matching.Regex

def recursiveListFiles(f: File, r: Regex): Array[File] = {
  val these = f.listFiles
  val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
  good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}


def moveRenameFile(source: String, destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),
        Paths.get(destination),
        StandardCopyOption.REPLACE_EXISTING
    )
    // could return `path`
}


def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)
 
  val existingfilename = recursiveListFiles(new File(fullpath), "part-00000-.*")
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}


import java.io.File

def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
    dir.listFiles.filter(_.isFile).toList.filter { file =>
        extensions.exists(file.getName.startsWith(_))
    }
}

def moveRenameFile(source: String, destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),
        Paths.get(destination),
        StandardCopyOption.REPLACE_EXISTING
    )
    // could return `path`
}


def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)
 
  val suffixList = List("part-00000")
  val existingfilename = getListOfFiles(new File(fullpath), suffixList )
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"
  val pathandfile = fullpath + "/" + targefilename

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(pathandfile )

dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}

I need some help in Scala or in some other way, to rename the "part-00000" file to "internal_dashboard.csv" and overwrite the older version.

For reference for those who haven't used Data Fusion, the tools at my disposal are:

Description
Executes user-provided Spark code in Scala.

Use Case
This plugin can be used when you want arbitrary Spark code.

Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside, with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method, with the method signature as def run(implicit sec: SparkExecutionContext): Unit 

Description
Executes user-provided Spark code in Python.

Use Case
This plugin can be used when you want to run arbitrary Spark code.

Edit:

(Nov 2, 2020) I've just learned that there are also Google Cloud Functions that can be written in Python (or Java) and triggered whenever a change happens in the bucket it resides in. If anyone knows how to make such a function that can rename and overwrite the 'part-00000" file when triggered, please let me know. I'll give it a try if all else fails.

Upvotes: 0

Views: 1472

Answers (2)

CrazySK
CrazySK

Reputation: 1

Answer without spark usage .To rename file Use GCS Move . Used as workaround

Source path --> fusion_output/folder3/path1/part-0000

Destination path -->enable macro --> fusion_output/folder3/path1/Store_${logicalStartTime(yyyy-MM-dd'T'HH-mm-ss,1d-4h+30m)}.csv

Upvotes: 0

Nir Hedvat
Nir Hedvat

Reputation: 870

Avoid renaming objects on AWS S3. There's no such thing, all that it does is 'cut and paste' => very expensive operation.

You can try :

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet(outputBasePath)

If you insist on using 'rename', use Hadoop libraries, not java:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val srcPath = new Path("source/...")
val destPath = new Path("dest/...")

srcPath.getFileSystem(new Configuration()).rename(srcPath, destPath)

Notice: when working with AWS S3, both paths must be in the same Bucket (they have different FileSystem objects, applicable when using rename(...)).

Upvotes: 1

Related Questions