Reputation: 1410
I am trying to parse xml in pyspark. I have a directory having many small xml files i want to parse all the xmls and put that in hdfs for that i have written below code.
Code:
import xml.etree.ElementTree as ET
from subprocess import Popen, PIPE
import pickle
filenme = sc.wholeTextFiles("/user/root/CD")
dumpoff1 = Popen(["hadoop", "fs", "-put", "-", "/user/cloudera/Demo/Demo.txt"],stdin=PIPE)
def getname(filenm):
return filenm[1]
def add_hk(filenm):
source=[]
global dumpoff1
doc = ET.fromstring(filenm)
for elem1 in doc.findall('.//documentInfo/source'):
source.append(elem1.text)
print source[0]
dumpoff1.stdin.write("%s\n" % source[0])
filenme.map(getname).foreach(add_hk)
but when i run this i am getting below error.
Error:
File "/opt/cloudera/parcels/CDH-5.11.0-1.cdh5.11.0.p0.34/lib/spark/python/pyspark/cloudpickle.py", line 582, in save_file raise pickle.PicklingError("Cannot pickle files that are not opened for reading") pickle.PicklingError: Cannot pickle files that are not opened for reading
i tried writing Popen inside add_hk then i am not getting the pickle error but the Demo.txt is getting overwritten and only have the latest files value. Please help.
Upvotes: 0
Views: 2155
Reputation: 10086
You should load your xml
files using spark SQL and then write them in hdfs:
Assuming /user/root/CD/
is a local path (otherwise remove the file://
):
df = spark.read.format('com.databricks.spark.xml').options(rowTag='page').load('file:///user/root/CD/*')
you can write it as parquet
:
df.write.parquet([HDFS path])
Upvotes: 1