Reputation: 489
I need to read a csv file from S3 ,it has string,double data but i will read as string which will provide a dynamic frame of only string. I want to do below for each row
Storename,code,created_date,performancedata,accumulateddata,maxmontlydata
GHJ 0,GHJ0000001,2020-03-31,0015.5126-,0024.0446-,0017.1811-
MULT,C000000001,2020-03-31,0015.6743-,0024.4533-,0018.0719-
Below is the code that I have written so far
def ConvertToDec(myString):
pattern = re.compile("[0-9]{0,4}[\\.]?[0-9]{0,4}[-]?")
myString=myString.strip()
doubleVal="";
if myString and not pattern.match(myString):
doubleVal=-9999.9999;
else:
doubleVal=-Decimal(myString);
return doubleVal
def rowwise_function(row):
row_dict = row.asDict()
data='d';
if not row_dict['code']:
data=row_dict['code']
else:
data='CD'
if not row_dict['performancedata']:
data= data +row_dict['performancedata']
else:
data=data + 'HJ'
// new columns
row_dict['LC_CODE']=data
row_dict['CD_CD']=123
row_dict['GBL']=123.345
if rec["created_date"]:
rec["created_date"]= convStr =datetime.datetime.strptime(rec["created_date"], '%Y-%m-%d')
if rec["performancedata"]
rec["performancedata"] = ConvertToDec(rec["performancedata"])
newrow = Row(**row_dict)
return newrow
store_df = spark.read.option("header","true").csv("C:\\STOREDATA.TXT", sep="|")
ratings_rdd = store_df.rdd
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
updatedDF=spark.createDataFrame(ratings_rdd_new)
Basically, I am creating almost new DataFrame. My questions are below -
Upvotes: 0
Views: 951
Reputation: 28229
Use Spark dataframes/sql, why use rdd? You don't need to perform any low level data operations, all are column level so dataframes are easier/efficient to use.
To create new columns - .withColumn(<col_name>, <expression/value>)
(refer)
All the if's can be made .filter
(refer)
The whole ConvertToDec
can be written better using strip and ast module or float
.
Upvotes: 1