Reputation: 796
There is input_file_name function in Apache Spark which is used by me to add new column to Dataset with the name of file which is currently being processed.
The problem is that I'd like to somehow customize this function to return only file name, ommitting the full path to it on s3.
For now, I am doing replacement of the path on the second step using map function:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
But I'd like to use something like
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
Upvotes: 6
Views: 10696
Reputation: 2571
Borrowing from a related question here, the following method is more portable and does not require a custom UDF.
Spark SQL Code Snippet: reverse(split(path, '/'))[0]
Spark SQL Sample:
WITH sample_data as (
SELECT 'path/to/my/filename.txt' AS full_path
)
SELECT
full_path
, reverse(split(full_path, '/'))[0] as basename
FROM sample_data
Explanation:
The split()
function breaks the path into it's chunks and reverse()
puts the final item (the file name) in front of the array so that [0]
can extract just the filename.
Full Code example here :
spark.sql(
"""
|WITH sample_data as (
| SELECT 'path/to/my/filename.txt' AS full_path
| )
| SELECT
| full_path
| , reverse(split(full_path, '/'))[0] as basename
| FROM sample_data
|""".stripMargin).show(false)
Result :
+-----------------------+------------+
|full_path |basename |
+-----------------------+------------+
|path/to/my/filename.txt|filename.txt|
+-----------------------+------------+
Upvotes: 5
Reputation: 29165
commons io is natural/easiest import in spark means(no need to add additional dependency...)
import org.apache.commons.io.FilenameUtils
getBaseName(String fileName)
Gets the base name, minus the full path and extension, from a full fileName.
val baseNameOfFile = udf((longFilePath: String) => FilenameUtils.getBaseName(longFilePath))
Usage is like...
yourdataframe.withColumn("shortpath" ,baseNameOfFile(yourdataframe("input_file_name")))
.show(1000,false)
Upvotes: 0
Reputation: 35414
In Scala:
#register udf
spark.udf
.register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)
#use the udf to get last token(filename) in full path
val initialDs = spark.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name))
Edit: In Java as per comment
#register udf
spark.udf()
.register("get_only_file_name", (String fullPath) -> {
int lastIndex = fullPath.lastIndexOf("/");
return fullPath.substring(lastIndex, fullPath.length - 1);
}, DataTypes.StringType);
import org.apache.spark.sql.functions.input_file_name
#use the udf to get last token(filename) in full path
Dataset<Row> initialDs = spark.read()
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path)
.withColumn("input_file_name", get_only_file_name(input_file_name()));
Upvotes: 11