skrprince
skrprince

Reputation: 91

Create a dataframe out of dbutils.fs.ls output in Databricks

So, I'm a beginner and learning spark programming (pyspark) on Databricks -

What am I trying to do ?

List all the files in a directory and save it into a dataframe so that I am able to apply filter, sort etc on this list of files. Why ? Because I am trying to find the biggest file in my directory.

Why doesn't below work ? What am I missing ?

from pyspark.sql.types import StringType

sklist = dbutils.fs.ls(sourceFile)

df = spark.createDataFrame(sklist,StringType())

Upvotes: 3

Views: 13760

Answers (4)

Julaayi
Julaayi

Reputation: 499

Adding complete code based on @OlegK code since it was throwing me errors when ran.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType
#------
root_dir = 'abfss://myPath'

fslsSchema = StructType(
    [
        StructField('path', StringType()),
        StructField('name', StringType()),
        StructField('size', LongType()),
        StructField('modtime', LongType())
    ]
)

filelist = dbutils.fs.ls(root_dir)
df_files = spark.createDataFrame(filelist, fslsSchema)

df_files.createOrReplaceTempView("files_view") #Replacing view so that I can run as many times as needed

And then one additional thing in the SQL.

SELECT REPLACE(name, '/', ''), size, modtime
FROM files_view
WHERE name LIKE '<your file pattern>%.parq'
ORDER BY modtime;

Upvotes: 0

Henrique Florencio
Henrique Florencio

Reputation: 3751

You don't need to set the schema:

df = spark.createDataFrame(dbutils.fs.ls(sourceFile))

Upvotes: 6

OlegK
OlegK

Reputation: 141

Updating the answer by @skrprince. The schema has a new field "modtime" now that uses Unix epoch values. It's best to use LongType() for both size and modtime, since IntegerType will fail on larger values.

fslsSchema = StructType(
  [
    StructField('path', StringType()),
    StructField('name', StringType()),
    StructField('size', LongType()),
    StructField('modtime', LongType())
  ]
)

filelist = dbutils.fs.ls('<your path here>')
df_files = spark.createDataFrame(filelist, fslsSchema)

You can also create a temporary view to execute SQL queries against your dataframe data:

df_files.createTempView("files_view")

Then you can run queries in the same notebook like the example below:

%sql
SELECT name, size, modtime
FROM files_view
WHERE name LIKE '<your file pattern>%.parq'
ORDER BY modtime

Upvotes: 2

skrprince
skrprince

Reputation: 91

ok, actually, I figured it out :). Just wanna leave the question here incase some one benefits from it.

So basically, the problem was with the schema. Not all the elements in the list was of String Type. So I explicitly created a schema and used it in createDataFrame function.

Working code -

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

ddlSchema = StructType([
StructField('path',StringType()),
StructField('name',StringType()),
StructField('size',IntegerType())
])

sklist = dbutils.fs.ls(sourceFile)
df = spark.createDataFrame(sklist,ddlSchema)

Upvotes: 6

Related Questions