gpanda
gpanda

Reputation: 875

Parse Pyspark RDD of Key/Value Pairs to .csv Format

I am building a parser that accepts a raw text file of "key"="value" pairs and writes to a tabular/.csv structure with PySpark.

Where I am stuck is, I can access they keys and values within a function to construct each csv_row, and even check if the keys equal a list of expected keys (col_list), but as I am calling that function processCsv within a lambda, I don't know how to append each csv_row to the global list of lists l_of_l which is intended to hold the final list of .csv rows.

How can I iterate over each record of an RDD in key/value format and parse to a .csv format? As you can see, my final list of lists (l_of_l) is empty, but I can get each row within the loop... frustrating.

All suggestions appreciated!

The raw text structure (foo.log):

"A"="foo","B"="bar","C"="baz"
"A"="oof","B"="rab","C"="zab"
"A"="aaa","B"="bbb","C"="zzz"

Approach so-far:

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import Row

sc=SparkContext('local','foobar')
sql = SQLContext(sc)

# Read raw text to RDD
lines=sc.textFile('foo.log')
records=lines.map(lambda x: x.replace('"', '').split(","))

print 'Records pre-transform:\n'
print records.take(100)
print '------------------------------\n'

def processRecord(record, col_list):    
    csv_row=[]
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            print 'Key-to-Column Mismatch, dropping value.'
    print csv_row
    global l_of_l
    l_of_l.append(csv_row)

l_of_l=[]
colList=['A', 'B', 'C']
records.foreach(lambda x: processRecord(x, col_list=colList))

print 'Final list of lists:\n'
print l_of_l

Output:

Records pre-transform:
[[u'A=foo', u'B=bar', u'C=baz'], [u'A=oof', u'B=rab', u'C=zab'], [u'A=aaa', u'B=bbb', u'C=zzz']]
------------------------------

[u'foo', u'bar', u'baz']
[u'oof', u'rab', u'zab']
[u'aaa', u'bbb', u'zzz']

Final list of lists:
[]

Upvotes: 1

Views: 1549

Answers (1)

desertnaut
desertnaut

Reputation: 60317

Try this function:

def processRecord(record, col_list):    
    csv_row=list()
    for idx, val in enumerate(record):
        key, value = val.split('=')        
        if(key==col_list[idx]):
            # print 'Col name match'
            # print value
            csv_row.append(value)
        else:
            csv_row.append(None)
            # print 'Key-to-Column Mismatch, dropping value.'
    return csv_row

And then

colList=['A', 'B', 'C']
l_of_l = records.map(lambda x: processRecord(x, col_list=colList)).collect()

print 'Final list of lists:\n'
print l_of_l

should give

Final list of lists: 
[[u'foo', u'bar', u'baz'], [u'oof', u'rab', u'zab'], [u'aaa', u'bbb', u'zzz']]

Upvotes: 1

Related Questions