Sandy
Sandy

Reputation: 141

Split Large Dataframe into multiple smaller dataframe

I've a larger dataframe with more than 100 columns and set of columns have same names with unique numbering. Multiple smaller dataframe to be created based on this unique number.

Yes column names have same pattern and number of such groups could be sometimes 64 or sometimes 128. net1, net2, net3...net64...net128

I need to have 64 subdfs or 128 subdfs. I cannot use startswith because column name net1, net10,net11...net100,net101...could match

I've created a solution in Spark+Scala, it works fine but I feel there must be an easier way to achieve it dynamically

df_net.printSchema()

|-- net1: string (nullable = true)
|-- net1_a: integer (nullable = true)
|-- net1_b: integer (nullable = true)
|-- net1_c: integer (nullable = true)
|-- net1_d: integer (nullable = true)
|-- net1_e: integer (nullable = true)
|-- net2: string (nullable = true)
|-- net2_a: integer (nullable = true)
|-- net2_b: integer (nullable = true)
|-- net2_c: integer (nullable = true)
|-- net2_d: integer (nullable = true)
|-- net2_e: integer (nullable = true)
|-- net3: string (nullable = true)
|-- net3_a: integer (nullable = true)
|-- net3_b: integer (nullable = true)
|-- net3_c: integer (nullable = true)
|-- net3_d: integer (nullable = true)
|-- net3_e: integer (nullable = true)
|-- net4: string (nullable = true)
|-- net4_a: integer (nullable = true)
|-- net4_b: integer (nullable = true)
|-- net4_c: integer (nullable = true)
|-- net4_d: integer (nullable = true)
|-- net4_e: integer (nullable = true)
|-- net5: string (nullable = true)
|-- net5_a: integer (nullable = true)
|-- net5_b: integer (nullable = true)
|-- net5_c: integer (nullable = true)
|-- net5_d: integer (nullable = true)
|-- net5_e: integer (nullable = true)
val df_net1 = df_net
                        .filter(!($"net1".isNull))
.select("net1","net1_a","net1_b","net1_c","net1_d","net1_e")

val df_net2 = df_net
                        .filter(!($"net2".isNull))
                        .select("net2","net2_a","net2_b","net2_c","net2_d","net2_e")

val df_net3 = df_net
                        .filter(!($"net3".isNull))
                        .select("net3","net3_a","net3_b","net3_c","net3_d","net3_e")

smaller data frames filtered based on unique number

Upvotes: 1

Views: 229

Answers (4)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

Assuming that you have common prefix names in the columns. this solution will work... for variable number of columns with same prefix..

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.{DataFrame, SparkSession}

object FilterDataframes extends App {
  val logger = org.apache.log4j.Logger.getLogger("org")
  logger.setLevel(Level.WARN)
  val spark = SparkSession.builder()
    .appName(this.getClass.getName)
    .config("spark.master", "local[*]").getOrCreate()

  import spark.implicits._

  val df = spark
    .sparkContext.parallelize(Seq(new MyNets())).toDF
  df.show


  case class MyNets(
                     net1: Int = 1,
                     net1_a: Int = 2,
                     net1_b: Int = 3,
                     net1_c: Int = 4,
                     net1_d: Int = 4,
                     net1_e: Int = 5,
                     net2: Int = 6,
                     net2_a: Int = 7,
                     net2_b: Int = 8,
                     net2_c: Int = 9,
                     net2_d: Int = 10,
                     net2_e: Int = 11,
                     net3: Int = 12,
                     net3_a: Int = 13,
                     net3_b: Int = 14,
                     net3_c: Int = 15,
                     net3_d: Int = 16,
                     net4_e: Int = 17,
                     net5: Int = 18,
                     net5_a: Int = 19,
                     net5_b: Int = 20,
                     net5_c: Int = 21,
                     net5_d: Int = 22,
                     net5_e: Int = 23
                   )
  val net1: DataFrame = df.select(df.columns.filter(_.startsWith("net1")).map(df(_)): _*)
  val net2: DataFrame = df.select(df.columns.filter(_.startsWith("net2")).map(df(_)): _*)
  val net3: DataFrame = df.select(df.columns.filter(_.startsWith("net3")).map(df(_)): _*)
  val net4: DataFrame = df.select(df.columns.filter(_.startsWith("net4")).map(df(_)): _*)
  val net5: DataFrame = df.select(df.columns.filter(_.startsWith("net5")).map(df(_)): _*)

  net1.show
  net2.show
  net3.show
  net4.show
  net5.show
}

Result:

+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+
|net1|net1_a|net1_b|net1_c|net1_d|net1_e|net2|net2_a|net2_b|net2_c|net2_d|net2_e|net3|net3_a|net3_b|net3_c|net3_d|net4_e|net5|net5_a|net5_b|net5_c|net5_d|net5_e|
+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+
|   1|     2|     3|     4|     4|     5|   6|     7|     8|     9|    10|    11|  12|    13|    14|    15|    16|    17|  18|    19|    20|    21|    22|    23|
+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+

+----+------+------+------+------+------+
|net1|net1_a|net1_b|net1_c|net1_d|net1_e|
+----+------+------+------+------+------+
|   1|     2|     3|     4|     4|     5|
+----+------+------+------+------+------+

+----+------+------+------+------+------+
|net2|net2_a|net2_b|net2_c|net2_d|net2_e|
+----+------+------+------+------+------+
|   6|     7|     8|     9|    10|    11|
+----+------+------+------+------+------+

+----+------+------+------+------+
|net3|net3_a|net3_b|net3_c|net3_d|
+----+------+------+------+------+
|  12|    13|    14|    15|    16|
+----+------+------+------+------+

+------+
|net4_e|
+------+
|    17|
+------+

+----+------+------+------+------+------+
|net5|net5_a|net5_b|net5_c|net5_d|net5_e|
+----+------+------+------+------+------+
|  18|    19|    20|    21|    22|    23|
+----+------+------+------+------+------+

Now you can do null check on resultant dataframes.

UPDATE : since you have 160+ number of nets you can add _ character as well to startswith to avoid overlapping with other groups..


    var i  =0
      i+=1
      val net1: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
      i+=1
      val net2: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
      i+=1
      val net3: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)

      i+=1
      val net4: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
      i+=1
      val net5: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)

will still result the same output as shown above with out overlapping with other nets...

Upvotes: 1

Ryan Widmaier
Ryan Widmaier

Reputation: 8523

I would collapse your different groups of net fields to one set with a net_type field. Then you could do a partitioned write which would allow you to easily load an individual set or more than one as needed.

This gives you a couple of benefits:

  • If you need to do aggregations to count by type or something it would be easy to do
  • You can load one set or any number of subsets.
  • Spark will automatically decide which ones to load for you based upon what values you filter on net_type with
  • All the output files will be written in a single pass by Spark instead of one per group

Here is the code to do it:

import org.apache.spark.sql.functions._

case class Net(net1:Integer, 
               net1_a:Integer,
               net1_b:Integer,
               net2:Integer,
               net2_a:Integer,
               net2_b:Integer)

val df = Seq(
    Net(1, 1, 1, null, null, null),
    Net(2, 2, 2, null, null, null),
    Net(null, null, null, 3, 3, 3)
).toDS

// You could find these automatically if you wanted
val columns = Seq("net1", "net2")

// Turn each group of fields into a struct with a populated "net_type" field
val structColumns = columns.map(c => 
    when(col(c).isNotNull, 
        struct(
            lit(c) as "net_type",
            col(c) as "net",
            col(c + "_a") as "net_a",
            col(c + "_b") as "net_b"
        )
    )
)

// Put into one column the populated group for each row
val df2 = df.select(coalesce(structColumns:_*) as "net")

// Flatten back down to top level fields instead of being in a struct
val df3 = df2.selectExpr("net.*")

df.write.partitionBy("net_type").parquet("/some/file/path.parquet")

This would give you rows like this:

scala> df3.show
+--------+---+-----+-----+
|net_type|net|net_a|net_b|
+--------+---+-----+-----+
|    net1|  1|    1|    1|
|    net1|  2|    2|    2|
|    net2|  3|    3|    3|
+--------+---+-----+-----+

And files on your file system like this:

/some/file/path.parquet/
    net_type=net1/
        part1.parquet
        ..
    net_type=net2/
        part1.parquet
        ..

Upvotes: 0

msrv499
msrv499

Reputation: 339

Seems columns in dataframe has some pattern as they are starting with some common string, if that will not change. you can use something like below.

val df_net1 = df.select(df.columns.filter(a => a.startsWith("net1")).map(a => 
df(a)) : _*)

val df_net2 = df.select(df.columns.filter(a => a.startsWith("net2")).map(a => 
df(a)) : _*)

val df_net3 = df.select(df.columns.filter(a => a.startsWith("net3")).map(a => 
df(a)) : _*)

Upvotes: 1

Charlie Flowers
Charlie Flowers

Reputation: 1380

Assuming your DF is split predictably into groups of 6 columns, the following will produce an Iterator[Dataset] where each element contains 6 columns from the parent dataset:

scala> df.printSchema
root
 |-- net1: string (nullable = false)
 |-- net1_a: integer (nullable = false)
 |-- net1_b: integer (nullable = false)
 |-- net1_c: integer (nullable = false)
 |-- net1_d: integer (nullable = false)
 |-- net1_e: integer (nullable = false)
 |-- net2: string (nullable = false)
 |-- net2_a: integer (nullable = false)
 |-- net2_b: integer (nullable = false)
 |-- net2_c: integer (nullable = false)
 |-- net2_d: integer (nullable = false)
 |-- net2_e: integer (nullable = false)
 |-- net3: string (nullable = false)
 |-- net3_a: integer (nullable = false)
 |-- net3_b: integer (nullable = false)
 |-- net3_c: integer (nullable = false)
 |-- net3_d: integer (nullable = false)
 |-- net3_e: integer (nullable = false)
 |-- net4: string (nullable = false)
 |-- net4_a: integer (nullable = false)
 |-- net4_b: integer (nullable = false)
 |-- net4_c: integer (nullable = false)
 |-- net4_d: integer (nullable = false)
 |-- net4_e: integer (nullable = false)
 |-- net5: string (nullable = false)
 |-- net5_a: integer (nullable = false)
 |-- net5_b: integer (nullable = false)
 |-- net5_c: integer (nullable = false)
 |-- net5_d: integer (nullable = false)
 |-- net5_e: integer (nullable = false)

scala> val sub_dfs = df.schema.map(_.name).grouped(6).map{fields => df.select(fields.map(col): _*).where(col(fields.head).isNotNull)}
sub_dfs: Iterator[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = non-empty iterator

scala> sub_dfs.foreach{_.printSchema}
root
 |-- net1: string (nullable = false)
 |-- net1_a: integer (nullable = false)
 |-- net1_b: integer (nullable = false)
 |-- net1_c: integer (nullable = false)
 |-- net1_d: integer (nullable = false)
 |-- net1_e: integer (nullable = false)

root
 |-- net2: string (nullable = false)
 |-- net2_a: integer (nullable = false)
 |-- net2_b: integer (nullable = false)
 |-- net2_c: integer (nullable = false)
 |-- net2_d: integer (nullable = false)
 |-- net2_e: integer (nullable = false)

root
 |-- net3: string (nullable = false)
 |-- net3_a: integer (nullable = false)
 |-- net3_b: integer (nullable = false)
 |-- net3_c: integer (nullable = false)
 |-- net3_d: integer (nullable = false)
 |-- net3_e: integer (nullable = false)

root
 |-- net4: string (nullable = false)
 |-- net4_a: integer (nullable = false)
 |-- net4_b: integer (nullable = false)
 |-- net4_c: integer (nullable = false)
 |-- net4_d: integer (nullable = false)
 |-- net4_e: integer (nullable = false)

root
 |-- net5: string (nullable = false)
 |-- net5_a: integer (nullable = false)
 |-- net5_b: integer (nullable = false)
 |-- net5_c: integer (nullable = false)
 |-- net5_d: integer (nullable = false)
 |-- net5_e: integer (nullable = false)

Upvotes: 2

Related Questions