Reputation: 81
I have some lines of space seperated input data:
Name Company Start_Date End_Date
Naresh HDFC 2017-01-01 2017-03-31
Anoop ICICI 2017-05-01 2017-07-30
I need output as :
Naresh HDFC 2017 01
Naresh HDFC 2017 02
Naresh HDFC 2017 03
Anoop ICICI 2017 05
Anoop ICICI 2017 06
Anoop ICICI 2017 07
I have made a textfile of this data and placed this on my Hadoop cluster, and I have written the code but I am having some problem in getting the output. Kindly help.
I am not getting how to extract month from the entries and put them in range function, so I have hard coded in range function a value of 3.
Code:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SQLContext,Row
from pyspark.sql.types import *
import datetime
sc = SparkContext()
sqlcon = SQLContext(sc)
month_map={'01':1,'02':2,'03':3,'04':4,'05':5,'06':6,'07':7,'08':8,'09':9,
'10':10,'11':11,'12':12}
def get_month(str):
return datetime.date(int(str[:4]),month_map[str[5:7]],int(str[8:10]))
def parse_line(str):
match = str.split()
return (Row(name = match[0],type = match[1],start_date =
get_month(match[2]),end_date = get_month(match[3])))
#-----------------create RDD---------------
filepath = '/user/vikasmittal/Innovacer_data.txt'
rdd1 = sc.textFile(filepath)
rdd2 =rdd1.map(parse_line)
for i in range(3):
rdd3 = rdd2.map(lambda l:(l.name,l.type,l.start_date.year,i))
print(rdd3.collect())
Upvotes: 2
Views: 15206
Reputation: 10076
After loading your data transform it into a dataframe and cast Start_Date
and End_Date
as dates using either to_date
or cast("date")
import pyspark.sql.functions as psf
df = sqlcon\
.createDataFrame(rdd2, ['Name', 'Company', 'Start_Date', 'End_Date'])\
.withColumn("Start_Date", psf.to_date("Start_Date"))\
.withColumn("End_Date", psf.to_date("End_Date"))
df.show()
+------+-------+----------+----------+
| Name|Company|Start_Date| End_Date|
+------+-------+----------+----------+
|Naresh| HDFC|2017-01-01|2017-03-31|
| Anoop| ICICI|2017-05-01|2017-07-30|
+------+-------+----------+----------+
We'll apply a UDF
to compute the range of dates between Start_Date
and End_Date
:
from dateutil.relativedelta import relativedelta
def month_range(d1, d2):
return [d1 + relativedelta(months=+x) for x in range((d2.year - d1.year)*12 + d2.month - d1.month + 1)]
import pyspark.sql.functions as psf
from pyspark.sql.types import *
month_range_udf = psf.udf(month_range, ArrayType(DateType()))
We can now apply it to Start_Date
and End_Date
and explode
the array to only get one date per row:
df = df.withColumn("Date", psf.explode(month_range_udf("Start_Date", "End_Date")))
df.show()
+------+-------+----------+----------+----------+
| Name|Company|Start_Date| End_Date| Date|
+------+-------+----------+----------+----------+
|Naresh| HDFC|2017-01-01|2017-03-31|2017-01-01|
|Naresh| HDFC|2017-01-01|2017-03-31|2017-02-01|
|Naresh| HDFC|2017-01-01|2017-03-31|2017-03-01|
| Anoop| ICICI|2017-05-01|2017-07-30|2017-05-01|
| Anoop| ICICI|2017-05-01|2017-07-30|2017-06-01|
| Anoop| ICICI|2017-05-01|2017-07-30|2017-07-01|
+------+-------+----------+----------+----------+
We can now extract year
and month
from the Date
column:
res = df.select(
"Name",
"Company",
psf.year("Date").alias("year"),
psf.month("Date").alias("month")
)
res.show()
+------+-------+----+-----+
| Name|Company|year|month|
+------+-------+----+-----+
|Naresh| HDFC|2017| 1|
|Naresh| HDFC|2017| 2|
|Naresh| HDFC|2017| 3|
| Anoop| ICICI|2017| 5|
| Anoop| ICICI|2017| 6|
| Anoop| ICICI|2017| 7|
+------+-------+----+-----+
Upvotes: 3
Reputation: 2468
You can use pyspark's to_date
function as explained here.
Just import pyspark.sql.functions.*
>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]
You can extract month as follows :
>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]
Upvotes: 1