sumedha
sumedha

Reputation: 489

Pyspark Schema update/alter Dataframe

I need to read a csv file from S3 ,it has string,double data but i will read as string which will provide a dynamic frame of only string. I want to do below for each row

  1. concatenate few columns and create new columns
  2. Add new columns
  3. Convert value in 3rd column from string to date
  4. Convert values of column 4,5,6 individually from string to decimal
Storename,code,created_date,performancedata,accumulateddata,maxmontlydata
GHJ 0,GHJ0000001,2020-03-31,0015.5126-,0024.0446-,0017.1811-
MULT,C000000001,2020-03-31,0015.6743-,0024.4533-,0018.0719-

Below is the code that I have written so far

def ConvertToDec(myString):
    pattern = re.compile("[0-9]{0,4}[\\.]?[0-9]{0,4}[-]?")
    myString=myString.strip()
    doubleVal="";
    if  myString and  not pattern.match(myString):
       doubleVal=-9999.9999;
    else:
     doubleVal=-Decimal(myString);
    return doubleVal

def rowwise_function(row): 
    row_dict = row.asDict()
    data='d';
    if not row_dict['code']:
        data=row_dict['code']
    else:  
        data='CD'
    if not row_dict['performancedata']:
        data= data +row_dict['performancedata']
    else:  
        data=data + 'HJ'
    // new columns
    row_dict['LC_CODE']=data
    row_dict['CD_CD']=123
    row_dict['GBL']=123.345
    if rec["created_date"]:
        rec["created_date"]= convStr =datetime.datetime.strptime(rec["created_date"], '%Y-%m-%d')
    if rec["performancedata"]
        rec["performancedata"] = ConvertToDec(rec["performancedata"])

    newrow = Row(**row_dict)
    return newrow

store_df = spark.read.option("header","true").csv("C:\\STOREDATA.TXT", sep="|")
ratings_rdd = store_df.rdd
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
updatedDF=spark.createDataFrame(ratings_rdd_new)

Basically, I am creating almost new DataFrame. My questions are below -

  1. is this right approach ?
  2. Since i am my changing schema mostly is there any other approach

Upvotes: 0

Views: 951

Answers (1)

Ani Menon
Ani Menon

Reputation: 28229

Use Spark dataframes/sql, why use rdd? You don't need to perform any low level data operations, all are column level so dataframes are easier/efficient to use.

To create new columns - .withColumn(<col_name>, <expression/value>) (refer) All the if's can be made .filter (refer)

The whole ConvertToDec can be written better using strip and ast module or float.

Upvotes: 1

Related Questions