user12525899
user12525899

Reputation: 133

Translate Scala code to Rename and Move CSV file - Spark - PySpark

I am using the Scala code below to rename a CSV file into TXT file and move TXT file. I need to translate this code to Python/Pyspark but I am having problems (not well versed in Python). I would highly appreciate your help. Thanks in advance!

//Prepare to rename file
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)

//Create variables
val table_name = dbutils.widgets.get("table_name") // getting table name
val filePath = "dbfs:/mnt/datalake/" + table_name + "/" // path where original csv file name is located
val fileName = fs.globStatus(new Path(filePath+"part*"))(0).getPath.getName // getting original csv file name
val newfilename = table_name + ".txt" // renaming and transforming csv into txt
val curatedfilePath = "dbfs:/mnt/datalake/" + newfilename // curated path + new file name

//Move to curated folder
dbutils.fs.mv(filePath + fileName, curatedfilePath)

Here is the Python Code

%python

#Create variables
table_name = dbutils.widgets.get("table_name") # getting table name
filePath = "dbfs:/mnt/datalake/" + table_name + "/" # path where original csv file name is located
newfilename = table_name + ".txt" # transforming csv into txt
curatedfilePath = "dbfs:/mnt/datalake/" + newfilename # curated path + new file name

#Save CSV file
df_curated.coalesce(1).replace("", None).write.mode("overwrite").save(filePath,format='csv', delimiter='|', header=True, nullValue=None)

# getting original csv file name
for f in filePath:
            if f[1].startswith("part-00000"): 
                 original_file_name = f[1]

#move to curated folder
dbutils.fs.mv(filePath + fileName, curatedfilePath)

I am having problem with the "getting original file name" part. It throws the following error:

IndexError: string index out of range
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<command-3442953727364942> in <module>()
     11 # getting original csv file name
     12 for f in filePath:
---> 13             if f[1].startswith("part-00000"):
     14                  original_file_name = f[1]
     15 

IndexError: string index out of range

Upvotes: 0

Views: 985

Answers (1)

blackbishop
blackbishop

Reputation: 32660

In the Scala code, you're using hadoop.fs.golobStatus to list the part files from the folder where you save the DataFrame.

In Python you can do the same by accessing hadoop.fs via the JVM like this:

conf = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path

part_files = Path(filePath).getFileSystem(conf).globStatus(Path(filePath + "/part*"))
file_name = part_files[0].getPath().getName()

Upvotes: 2

Related Questions