Reputation: 1886
I have loaded CSV data into a Spark DataFrame.
I need to slice this dataframe into two different dataframes, where each one contains a set of columns from the original dataframe.
How do I select a subset into a Spark dataframe, based on columns ?
Upvotes: 43
Views: 220969
Reputation: 18475
There are multiple options (especially in Scala) to select a subset of columns of that Dataframe. The following lines show the options and most of them are documented in the ScalaDocs of Column:
import spark.implicits._
import org.apache.spark.sql.functions.{col, column, expr}
inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))
// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol
// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)
// Special cases
col("columnName.field") // Extracting a struct field
col("`a.column.with.dots`") // Escape `.` in column names.
// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)
The usage of $
is possible as Scala provides an implicit class that converts a String into a Column using the method $
:
implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef {
def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = { /* compiled code */ }
}
Typically, when you want to derive one DataFrame to multiple DataFrames it might improve your performance if you persist
the original DataFrame before creating the others. At the end you can unpersist
the original DataFrame.
Keep in mind that Columns are not resolved at compile time but only when it is compared to the column names of your catalog which happens during analyser phase of the query execution. In case you need stronger type safety you could create a Dataset
.
For completeness, here is the csv to try out above code:
// csv file:
// colA,colB,colC
// 1,"foo","bar"
val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)
// resulting DataFrame schema
root
|-- colA: string (nullable = true)
|-- colB: string (nullable = true)
|-- colC: string (nullable = true)
Upvotes: 11
Reputation: 107
i liked dehasis approach, because it allowed me to select, rename and convert columns in one step. However I had to adjust it to make it work for me in PySpark:
from pyspark.sql.functions import col
spark.read.csv(path).select(
col('_c0').alias("stn").cast('String'),
col('_c1').alias("wban").cast('String'),
col('_c2').alias("lat").cast('Double'),
col('_c3').alias("lon").cast('Double')
)
.where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)
Upvotes: 3
Reputation: 714
Problem was to select columns of on dataframe after joining with other dataframe.
I tried below and select the columns of salaryDf from the joined dataframe.
Hope this will help
val empDf=spark.read.option("header","true").csv("/data/tech.txt")
val salaryDf=spark.read.option("header","true").csv("/data/salary.txt")
val joinData= empDf.join(salaryDf,empDf.col("first") === salaryDf.col("first") and empDf.col("last") === salaryDf.col("last"))
//**below will select the colums of salaryDf only**
val finalDF=joinData.select(salaryDf.columns map salaryDf.col:_*)
//same way we can select the columns of empDf
joinData.select(empDf.columns map empDf.col:_*)
Upvotes: 0
Reputation: 8519
If you want to split you dataframe into two different ones, do two selects on it with the different columns you want.
val sourceDf = spark.read.csv(...)
val df1 = sourceDF.select("first column", "second column", "third column")
val df2 = sourceDF.select("first column", "second column", "third column")
Note that this of course means that the sourceDf would be evaluated twice, so if it can fit into distributed memory and you use most of the columns across both dataframes it might be a good idea to cache it. It it has many extra columns that you don't need, then you can do a select on it first to select on the columns you will need so it would store all that extra data in memory.
Upvotes: 51
Reputation: 11
You can use the below code to select columns based on their index (position). You can alter the numbers for variable colNos to select only those columns
import org.apache.spark.sql.functions.col
val colNos = Seq(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35)
val Df_01 = Df.select(colNos_01 map Df.columns map col: _*)
Df_01.show(20, false)
Upvotes: 0
Reputation: 293
Let's say our parent Dataframe has 'n' columns
we can create 'x' child DataFrames( Lets consider 2 in our case).
The columns for the child Dataframe can be chosen as per desire from any of the parent Dataframe columns.
Consider source has 10 columns and we want to split into 2 DataFrames that contains columns referenced from the parent Dataframe.
The columns for the child Dataframe can be decided using the select Dataframe API
val parentDF = spark.read.format("csv").load("/path of the CSV file")
val Child1_DF = parentDF.select("col1","col2","col3","col9","col10").show()
val child2_DF = parentDF.select("col5", "col6","col7","col8","col1","col2").show()
Notice that the column count in the child dataframes can differ in length and will be less than the parent dataframe column count.
we can also refer to the column names without mentioning the real names using the positional indexes of the desired column from the parent dataframe
Import spark implicits first which acts as a helper class for usage of $-notation to access the columns using the positional indexes
import spark.implicits._
import org.apache.spark.sql.functions._
val child3_DF = parentDF.select("_c0","_c1","_c2","_c8","_c9").show()
we can also select column basing on certain conditions. Lets say we want only even numbered columns to be selected in the child dataframe. By even we refer to even indexed columns and index being starting from '0'
val parentColumns = parentDF.columns.toList
res0: List[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,_c8,_c9)
val evenParentColumns = res0.zipWithIndex.filter(_._2 % 2 == 0).map( _._1).toSeq
res1: scala.collection.immutable.Seq[String] = List(_c0, _c2, _c4, _c6,_c8)
Now feed these columns to be selected from the parentDF.Note that the select API need seq type arguments.So we converted the "evenParentColumns" to Seq collection
val child4_DF = parentDF.select(res1.head, res1.tail:_*).show()
This will show the even indexed columns from the parent Dataframe.
| _c0 | _c2 | _c4 |_c6 |_c8 |
|ITE00100554|TMAX|null| E| 1 |
|TE00100554 |TMIN|null| E| 4 |
|GM000010962|PRCP|null| E| 7 |
So Now we are left with the even numbered columns in the dataframe
Similarly we can also apply other operations to the Dataframe column like shown below
val child5_DF = parentDF.select($"_c0", $"_c8" + 1).show()
So by many ways as mentioned we can select the columns in the Dataframe.
Upvotes: 6
Reputation: 2773
Just by using select select
you can select particular columns, give them readable names and cast them. For example like this:
spark.read.csv(path).select(
'_c0.alias("stn").cast(StringType),
'_c1.alias("wban").cast(StringType),
'_c2.alias("lat").cast(DoubleType),
'_c3.alias("lon").cast(DoubleType)
)
.where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)
Upvotes: 0
Reputation: 1886
Solved, just use select method for the dataframe to select columns:
val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\cabs_trajectories\\cabs_trajectories\\green\\2014\\green_tripdata_2014-09.csv")
val df1=df.select("_c0")
this would subset the first column of the dataframe
Upvotes: 1