Ironman
Ironman

Reputation: 1410

Pickle error in Pyspark

I am trying to parse xml in pyspark. I have a directory having many small xml files i want to parse all the xmls and put that in hdfs for that i have written below code.

Code:

import xml.etree.ElementTree as ET
from subprocess import Popen, PIPE
import pickle
filenme = sc.wholeTextFiles("/user/root/CD")
dumpoff1 = Popen(["hadoop", "fs", "-put", "-", "/user/cloudera/Demo/Demo.txt"],stdin=PIPE)

def getname(filenm):
   return filenm[1]

def add_hk(filenm):
   source=[]
   global dumpoff1 
   doc = ET.fromstring(filenm)
   for elem1 in doc.findall('.//documentInfo/source'):
       source.append(elem1.text)
       print source[0]
       dumpoff1.stdin.write("%s\n" % source[0]) 

filenme.map(getname).foreach(add_hk)

but when i run this i am getting below error.

Error:

File "/opt/cloudera/parcels/CDH-5.11.0-1.cdh5.11.0.p0.34/lib/spark/python/pyspark/cloudpickle.py", line 582, in save_file raise pickle.PicklingError("Cannot pickle files that are not opened for reading") pickle.PicklingError: Cannot pickle files that are not opened for reading

i tried writing Popen inside add_hk then i am not getting the pickle error but the Demo.txt is getting overwritten and only have the latest files value. Please help.

Upvotes: 0

Views: 2155

Answers (1)

MaFF
MaFF

Reputation: 10086

You should load your xmlfiles using spark SQL and then write them in hdfs:

Assuming /user/root/CD/ is a local path (otherwise remove the file://):

df = spark.read.format('com.databricks.spark.xml').options(rowTag='page').load('file:///user/root/CD/*')

you can write it as parquet:

df.write.parquet([HDFS path])

Upvotes: 1

Related Questions