Reputation: 2085
I have a dataframe created in spark after reading a table from postgres as below.
val url = "jdbc:postgresql://localhost:5432/testdb"
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", "org.postgresql.Driver")
connectionProperties.setProperty("Username", "testDB")
connectionProperties.setProperty("Password", "123456")
val query = "select * from testdb.datatable"
val dataDF = spark.read.jdbc(url, query1, connectionProperties)
I can see the count of data from the dataframe:
scala> dataDF.count
count: 3907891
sample output:
scala> dataDF.take(5)
------------|----|--------|
|source_name|id |location|
|-----------|----|--------|
| DB2 | 10 |Hive |
| SAP | 20 |Hive |
| SQL Server| 17 |Hive |
| Oracle | 21 |Hive |
| DB2 | 33 |Hive |
|-----------|----|--------|
The dataframe contains a column "ID" of type "Integer" which contains data in a range of 10 to 50
Is there anyway I can split the dataframe into 4 different partitions and write each partition as a files based on the column ID of which each file contains data of ID in file1: 10-20, file2: 21-30, file3: 31-40, file4: 41-50
Upvotes: 0
Views: 751
Reputation: 1704
If you know the ids range, I would go with something simple.
val data = Seq(
("db2", 10, "Hive"),
("sap", 20, "Hive"),
("sql", 17, "Hive"),
("oracle", 21, "Hive"),
("server", 33, "Hive"),
("risk", 43, "Hive"),
).toDF("source_name", "id", "location")
val bucketed = data.withColumn("bucket",
when($"id".between(0, 10), "1-10")
.when($"id".between(11, 20), "11-20")
.when($"id".between(21, 30), "21-30")
.when($"id".between(31, 40), "31-40")
.when($"id".between(41, 50), "41-50")
.otherwise("50+"))
bucketed.write.option("header", true)
.mode(SaveMode.Overwrite)
.partitionBy("bucket")
.csv("bucketing")
Upvotes: 1