Reputation: 6752
I am loading some data into Spark with a wrapper function:
def load_data( filename ):
df = sqlContext.read.format("com.databricks.spark.csv")\
.option("delimiter", "\t")\
.option("header", "false")\
.option("mode", "DROPMALFORMED")\
.load(filename)
# add the filename base as hostname
( hostname, _ ) = os.path.splitext( os.path.basename(filename) )
( hostname, _ ) = os.path.splitext( hostname )
df = df.withColumn('hostname', lit(hostname))
return df
specifically, I am using a glob to load a bunch of files at once:
df = load_data( '/scratch/*.txt.gz' )
the files are:
/scratch/host1.txt.gz
/scratch/host2.txt.gz
...
I would like the column 'hostname' to actually contain the real name of the file being loaded rather than the glob (ie host1
, host2
etc, rather than *
).
How can I do this?
Upvotes: 77
Views: 86551
Reputation: 958
This code works:
dfOut = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", strFormat) \
.option("cloudFiles.schemaLocation", strFolderPathArchive) \
.option("delimiter", strSeparator) \
.option("startingTimestamp", strStartingTimestamp) \
.load(strFolderPathData) \
.withColumn("timestamp_ingested", lit(current_timestamp()).cast(TimestampType())) \
.withColumn("input_file_name", col("_metadata.file_path").cast(StringType()))
Upvotes: 1
Reputation: 958
input_file_name() is deprecated for the _metadata column, e.g., "_metadata.file_name"; refer to docs at file-metadata
Upvotes: 5
Reputation: 330093
You can use input_file_name
which:
Creates a string column for the file name of the current Spark task.
from pyspark.sql.functions import input_file_name
df.withColumn("filename", input_file_name())
Same thing in Scala:
import org.apache.spark.sql.functions.input_file_name
df.withColumn("filename", input_file_name)
Upvotes: 168