Reputation: 141
I've a larger dataframe with more than 100 columns and set of columns have same names with unique numbering. Multiple smaller dataframe to be created based on this unique number.
Yes column names have same pattern and number of such groups could be sometimes 64 or sometimes 128. net1, net2, net3...net64...net128
I need to have 64 subdfs or 128 subdfs. I cannot use startswith because column name net1, net10,net11...net100,net101...could match
I've created a solution in Spark+Scala, it works fine but I feel there must be an easier way to achieve it dynamically
df_net.printSchema()
|-- net1: string (nullable = true)
|-- net1_a: integer (nullable = true)
|-- net1_b: integer (nullable = true)
|-- net1_c: integer (nullable = true)
|-- net1_d: integer (nullable = true)
|-- net1_e: integer (nullable = true)
|-- net2: string (nullable = true)
|-- net2_a: integer (nullable = true)
|-- net2_b: integer (nullable = true)
|-- net2_c: integer (nullable = true)
|-- net2_d: integer (nullable = true)
|-- net2_e: integer (nullable = true)
|-- net3: string (nullable = true)
|-- net3_a: integer (nullable = true)
|-- net3_b: integer (nullable = true)
|-- net3_c: integer (nullable = true)
|-- net3_d: integer (nullable = true)
|-- net3_e: integer (nullable = true)
|-- net4: string (nullable = true)
|-- net4_a: integer (nullable = true)
|-- net4_b: integer (nullable = true)
|-- net4_c: integer (nullable = true)
|-- net4_d: integer (nullable = true)
|-- net4_e: integer (nullable = true)
|-- net5: string (nullable = true)
|-- net5_a: integer (nullable = true)
|-- net5_b: integer (nullable = true)
|-- net5_c: integer (nullable = true)
|-- net5_d: integer (nullable = true)
|-- net5_e: integer (nullable = true)
val df_net1 = df_net
.filter(!($"net1".isNull))
.select("net1","net1_a","net1_b","net1_c","net1_d","net1_e")
val df_net2 = df_net
.filter(!($"net2".isNull))
.select("net2","net2_a","net2_b","net2_c","net2_d","net2_e")
val df_net3 = df_net
.filter(!($"net3".isNull))
.select("net3","net3_a","net3_b","net3_c","net3_d","net3_e")
smaller data frames filtered based on unique number
Upvotes: 1
Views: 229
Reputation: 29237
Assuming that you have common prefix names in the columns. this solution will work... for variable number of columns with same prefix..
package examples
import org.apache.log4j.Level
import org.apache.spark.sql.{DataFrame, SparkSession}
object FilterDataframes extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.config("spark.master", "local[*]").getOrCreate()
import spark.implicits._
val df = spark
.sparkContext.parallelize(Seq(new MyNets())).toDF
df.show
case class MyNets(
net1: Int = 1,
net1_a: Int = 2,
net1_b: Int = 3,
net1_c: Int = 4,
net1_d: Int = 4,
net1_e: Int = 5,
net2: Int = 6,
net2_a: Int = 7,
net2_b: Int = 8,
net2_c: Int = 9,
net2_d: Int = 10,
net2_e: Int = 11,
net3: Int = 12,
net3_a: Int = 13,
net3_b: Int = 14,
net3_c: Int = 15,
net3_d: Int = 16,
net4_e: Int = 17,
net5: Int = 18,
net5_a: Int = 19,
net5_b: Int = 20,
net5_c: Int = 21,
net5_d: Int = 22,
net5_e: Int = 23
)
val net1: DataFrame = df.select(df.columns.filter(_.startsWith("net1")).map(df(_)): _*)
val net2: DataFrame = df.select(df.columns.filter(_.startsWith("net2")).map(df(_)): _*)
val net3: DataFrame = df.select(df.columns.filter(_.startsWith("net3")).map(df(_)): _*)
val net4: DataFrame = df.select(df.columns.filter(_.startsWith("net4")).map(df(_)): _*)
val net5: DataFrame = df.select(df.columns.filter(_.startsWith("net5")).map(df(_)): _*)
net1.show
net2.show
net3.show
net4.show
net5.show
}
Result:
+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+ |net1|net1_a|net1_b|net1_c|net1_d|net1_e|net2|net2_a|net2_b|net2_c|net2_d|net2_e|net3|net3_a|net3_b|net3_c|net3_d|net4_e|net5|net5_a|net5_b|net5_c|net5_d|net5_e| +----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+ | 1| 2| 3| 4| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| +----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+----+------+------+------+------+------+ +----+------+------+------+------+------+ |net1|net1_a|net1_b|net1_c|net1_d|net1_e| +----+------+------+------+------+------+ | 1| 2| 3| 4| 4| 5| +----+------+------+------+------+------+ +----+------+------+------+------+------+ |net2|net2_a|net2_b|net2_c|net2_d|net2_e| +----+------+------+------+------+------+ | 6| 7| 8| 9| 10| 11| +----+------+------+------+------+------+ +----+------+------+------+------+ |net3|net3_a|net3_b|net3_c|net3_d| +----+------+------+------+------+ | 12| 13| 14| 15| 16| +----+------+------+------+------+ +------+ |net4_e| +------+ | 17| +------+ +----+------+------+------+------+------+ |net5|net5_a|net5_b|net5_c|net5_d|net5_e| +----+------+------+------+------+------+ | 18| 19| 20| 21| 22| 23| +----+------+------+------+------+------+
Now you can do null check on resultant dataframes.
UPDATE : since you have 160+ number of nets you can add _ character as well to startswith to avoid overlapping with other groups..
var i =0
i+=1
val net1: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
i+=1
val net2: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
i+=1
val net3: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
i+=1
val net4: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
i+=1
val net5: DataFrame = df.select(df.columns.filter(_.startsWith(s"net${i}_")).map(df(_)): _*)
will still result the same output as shown above with out overlapping with other nets...
Upvotes: 1
Reputation: 8523
I would collapse your different groups of net fields to one set with a net_type
field. Then you could do a partitioned write which would allow you to easily load an individual set or more than one as needed.
This gives you a couple of benefits:
net_type
withHere is the code to do it:
import org.apache.spark.sql.functions._
case class Net(net1:Integer,
net1_a:Integer,
net1_b:Integer,
net2:Integer,
net2_a:Integer,
net2_b:Integer)
val df = Seq(
Net(1, 1, 1, null, null, null),
Net(2, 2, 2, null, null, null),
Net(null, null, null, 3, 3, 3)
).toDS
// You could find these automatically if you wanted
val columns = Seq("net1", "net2")
// Turn each group of fields into a struct with a populated "net_type" field
val structColumns = columns.map(c =>
when(col(c).isNotNull,
struct(
lit(c) as "net_type",
col(c) as "net",
col(c + "_a") as "net_a",
col(c + "_b") as "net_b"
)
)
)
// Put into one column the populated group for each row
val df2 = df.select(coalesce(structColumns:_*) as "net")
// Flatten back down to top level fields instead of being in a struct
val df3 = df2.selectExpr("net.*")
df.write.partitionBy("net_type").parquet("/some/file/path.parquet")
This would give you rows like this:
scala> df3.show
+--------+---+-----+-----+
|net_type|net|net_a|net_b|
+--------+---+-----+-----+
| net1| 1| 1| 1|
| net1| 2| 2| 2|
| net2| 3| 3| 3|
+--------+---+-----+-----+
And files on your file system like this:
/some/file/path.parquet/
net_type=net1/
part1.parquet
..
net_type=net2/
part1.parquet
..
Upvotes: 0
Reputation: 339
Seems columns in dataframe has some pattern as they are starting with some common string, if that will not change. you can use something like below.
val df_net1 = df.select(df.columns.filter(a => a.startsWith("net1")).map(a =>
df(a)) : _*)
val df_net2 = df.select(df.columns.filter(a => a.startsWith("net2")).map(a =>
df(a)) : _*)
val df_net3 = df.select(df.columns.filter(a => a.startsWith("net3")).map(a =>
df(a)) : _*)
Upvotes: 1
Reputation: 1380
Assuming your DF is split predictably into groups of 6 columns, the following will produce an Iterator[Dataset]
where each element contains 6 columns from the parent dataset:
scala> df.printSchema
root
|-- net1: string (nullable = false)
|-- net1_a: integer (nullable = false)
|-- net1_b: integer (nullable = false)
|-- net1_c: integer (nullable = false)
|-- net1_d: integer (nullable = false)
|-- net1_e: integer (nullable = false)
|-- net2: string (nullable = false)
|-- net2_a: integer (nullable = false)
|-- net2_b: integer (nullable = false)
|-- net2_c: integer (nullable = false)
|-- net2_d: integer (nullable = false)
|-- net2_e: integer (nullable = false)
|-- net3: string (nullable = false)
|-- net3_a: integer (nullable = false)
|-- net3_b: integer (nullable = false)
|-- net3_c: integer (nullable = false)
|-- net3_d: integer (nullable = false)
|-- net3_e: integer (nullable = false)
|-- net4: string (nullable = false)
|-- net4_a: integer (nullable = false)
|-- net4_b: integer (nullable = false)
|-- net4_c: integer (nullable = false)
|-- net4_d: integer (nullable = false)
|-- net4_e: integer (nullable = false)
|-- net5: string (nullable = false)
|-- net5_a: integer (nullable = false)
|-- net5_b: integer (nullable = false)
|-- net5_c: integer (nullable = false)
|-- net5_d: integer (nullable = false)
|-- net5_e: integer (nullable = false)
scala> val sub_dfs = df.schema.map(_.name).grouped(6).map{fields => df.select(fields.map(col): _*).where(col(fields.head).isNotNull)}
sub_dfs: Iterator[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = non-empty iterator
scala> sub_dfs.foreach{_.printSchema}
root
|-- net1: string (nullable = false)
|-- net1_a: integer (nullable = false)
|-- net1_b: integer (nullable = false)
|-- net1_c: integer (nullable = false)
|-- net1_d: integer (nullable = false)
|-- net1_e: integer (nullable = false)
root
|-- net2: string (nullable = false)
|-- net2_a: integer (nullable = false)
|-- net2_b: integer (nullable = false)
|-- net2_c: integer (nullable = false)
|-- net2_d: integer (nullable = false)
|-- net2_e: integer (nullable = false)
root
|-- net3: string (nullable = false)
|-- net3_a: integer (nullable = false)
|-- net3_b: integer (nullable = false)
|-- net3_c: integer (nullable = false)
|-- net3_d: integer (nullable = false)
|-- net3_e: integer (nullable = false)
root
|-- net4: string (nullable = false)
|-- net4_a: integer (nullable = false)
|-- net4_b: integer (nullable = false)
|-- net4_c: integer (nullable = false)
|-- net4_d: integer (nullable = false)
|-- net4_e: integer (nullable = false)
root
|-- net5: string (nullable = false)
|-- net5_a: integer (nullable = false)
|-- net5_b: integer (nullable = false)
|-- net5_c: integer (nullable = false)
|-- net5_d: integer (nullable = false)
|-- net5_e: integer (nullable = false)
Upvotes: 2