Metadata
Metadata

Reputation: 2085

How to split dataframe based on a range of values in a column and store them in separate files?

I have a dataframe created in spark after reading a table from postgres as below.

val url    = "jdbc:postgresql://localhost:5432/testdb"
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", "org.postgresql.Driver")
connectionProperties.setProperty("Username", "testDB")
connectionProperties.setProperty("Password", "123456")
val query  = "select * from testdb.datatable"
val dataDF = spark.read.jdbc(url, query1, connectionProperties)

I can see the count of data from the dataframe:

scala> dataDF.count
count: 3907891

sample output:

    scala> dataDF.take(5)
------------|----|--------|
|source_name|id  |location|
|-----------|----|--------|
|   DB2     | 10 |Hive    |
|   SAP     | 20 |Hive    |
| SQL Server| 17 |Hive    |
|   Oracle  | 21 |Hive    |
|   DB2     | 33 |Hive    |
|-----------|----|--------|

The dataframe contains a column "ID" of type "Integer" which contains data in a range of 10 to 50 Is there anyway I can split the dataframe into 4 different partitions and write each partition as a files based on the column ID of which each file contains data of ID in file1: 10-20, file2: 21-30, file3: 31-40, file4: 41-50

Upvotes: 0

Views: 751

Answers (1)

Gelerion
Gelerion

Reputation: 1704

If you know the ids range, I would go with something simple.

val data = Seq(
      ("db2", 10, "Hive"),
      ("sap", 20, "Hive"),
      ("sql", 17, "Hive"),
      ("oracle", 21, "Hive"),
      ("server", 33, "Hive"),
      ("risk", 43, "Hive"),
    ).toDF("source_name", "id", "location")

val bucketed = data.withColumn("bucket",
         when($"id".between(0, 10), "1-10")
         .when($"id".between(11, 20), "11-20")
         .when($"id".between(21, 30), "21-30")
         .when($"id".between(31, 40), "31-40")
         .when($"id".between(41, 50), "41-50")
         .otherwise("50+"))

bucketed.write.option("header", true)
      .mode(SaveMode.Overwrite)
      .partitionBy("bucket")
      .csv("bucketing")

Upvotes: 1

Related Questions