Reputation: 69
I want to scan using Python a folder containing hundreds of .sas and .egp files.
The .sas files are not a problem since I can simply read them like that :
with open(file_path, 'r', encoding='latin-1') as f:
...
But since .egp files do not allow this technique, I use this function to extract the scripts from the archive so that I can then read them:
def extract_sas_scripts_from_egp(file_path):
scripts = []
extraction_dir = file_path.replace('.egp', '_extracted')
# Reset Extraction Folder
if os.path.exists(extraction_dir):
for file in os.listdir(extraction_dir):
os.remove(os.path.join(extraction_dir, file))
else:
os.makedirs(extraction_dir)
# Extract .sas files from the archive
with zipfile.ZipFile(file_path, 'r') as archive:
for file_info in archive.infolist():
if file_info.filename.endswith(".sas"):
extracted_path = os.path.join(extraction_dir, os.path.basename(file_info.filename))
with archive.open(file_info) as sas_file:
content = sas_file.read().decode('latin-1')
scripts.append((content, extracted_path))
with open(extracted_path, 'w', encoding='latin-1') as f:
f.write(content)
return scripts
My problem is that the generated "_extracted" folder seems to contain only the first .sas script, leaving out all the others I also need.
Thank you!
Upvotes: 0
Views: 92
Reputation: 2776
I think this is because all the sas files embedded in an EGP file are named "code.sas". So I expect the extracted file keeps getting overwritten by your code.
Each of the "code.sas" files is in a separate folder inside the EGP archive. To get the relevant folders, you need to parse the project.xml file in the top level of the archive:
Look for <NodeType> elements containing the text NODETYPE_PROGRAMFOLDER
.
These represent process flows and the <EGTreeNode> siblings of this element represent SAS programs.
Within these <EGTreeNode> elements, nodes named <ElementID> should contain the folder name where the code.sas file is located. And nodes named <Label> should contain the program name. (You can use this to rename code.sas after you've extracted it.)
I have used this method to extract SAS programs from EGP files for Enterprise Guide 7.1 and 5.1 (for version 5.1, I seem to remember the <Label> contains the full file path instead of the program name, so I had to do a bit more processing for those). Enterprise Guide 4.1 files have a different structure, so need a slightly different process.
I have done this using R and SAS code, but unfortunately not with python, so I can't recommend any packages/functions for parsing the XML.
Edit: An attempt at a python function for this task
This approach creates a sub-folder for each process flow - otherwise there will be a problem if 2 process flows have SAS programs with the same name.
I suspect this can be done much more elegantly (without all the re-naming and deleting), but this version has worked on a selection of Enterprise Guide 7.1 projects:
import os
import zipfile
import lxml.etree as ET
import shutil
def extract_sas_scripts_from_egp(file_path):
scripts = []
extraction_dir = file_path.replace('.egp', '_extracted')
# Reset Extraction Folder
if os.path.exists(extraction_dir):
for file in os.listdir(extraction_dir):
os.remove(os.path.join(extraction_dir, file))
else:
os.makedirs(extraction_dir)
# Reference to the EGP file
egp = zipfile.ZipFile(file_path, "r")
# Read the info from project.xml
tree = ET.parse(egp.open("project.xml"))
# Extract the process flows
process_flows = [elem for elem in tree.iter() if elem.text == "NODETYPE_PROGRAMFOLDER"]
# Extract the elements relating to process flows
programs = [elem for flow in process_flows for elem in flow.xpath("following-sibling::EGTreeNode")]
# ElementID elements contain the name of each folder inside the EGP containing code.sas
folders = [element.text for program in programs for element in program.xpath("ElementID")]
# Label elements contain the program name (used to rename the extracted files later)
names = [element.text for program in programs for element in program.xpath("Label")]
# Unique names of process flows (used to create directories later)
pfnames_unique = [elem.text for process in process_flows for elem in
process.xpath("./parent::EGTreeNode/parent::EGTreeNode/Label")]
# Names of process flows (one for each program)
pfnames = [elem.text for program in programs for elem in
program.xpath("./parent::EGTreeNode/parent::EGTreeNode/Label")]
# Path to code.sas files within the EGP archive
sasfiles = [folder + "/code.sas" for folder in folders]
# The relevant process flow folder to extract each code.sas file to
extracted_folders = [os.path.join(extraction_dir, folder) for folder in pfnames]
# The path to the extracted code.sas files (used to rename these later)
extracted_files = [os.path.join(extraction_dir, pfnames, sasfile) for sasfile, pfnames in zip(sasfiles, pfnames)]
# The desired paths and names for the extracted sas files
extracted_files_rename = [os.path.join(extraction_dir, pfnames, name + ".sas") for name, pfnames in zip(names, pfnames)]
# List of extracted directories to be deleted at the end
new_dirs = [os.path.join(extraction_dir, pf, fol) for pf, fol in zip(pfnames, folders)]
# New directories to create (one for each process flow)
new_dirs_unique = [os.path.join(extraction_dir, pf) for pf in pfnames_unique]
print(sasfiles)
print(extracted_folders)
print(extracted_files)
print(extracted_files_rename)
print(extracted_folders)
print(new_dirs)
print(new_dirs_unique)
# Create the directories for each process flow in the output directory
for new_dir in new_dirs_unique:
os.makedirs(new_dir)
# Extract the code.sas files. These will be extracted along with their parent
# directory inside the archive, so we will have to rename them later, and delete the directory.
# the try...except is for cases where there is no code.sas file - this happens if a program is
# linked rather than embedded in the EGP
for file, newfile in zip(sasfiles, extracted_folders):
try:
egp.extract(file, newfile)
except:
print("Unable to extract " + file)
# Rename to move the sas files to their correct location
for oldname, newname in zip(extracted_files, extracted_files_rename):
try:
os.rename(oldname, newname)
except:
pass
# Delete the unwanted directories
for dir in new_dirs:
try:
shutil.rmtree(dir)
except:
pass
Upvotes: 1