Reputation: 558
I'm rather new to PySpark and I am trying to load, and preprocess some data. However, some of the data seems to corrupted and I need to handle these errors, together with finding out which of the files that are corrupt.
def load_audio(path, sr = 22050):
return librosa.load(path,sr=sr)
rdd = sc.parallelize(paths) #Path to all files
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path)))) # Loads all the audio files
As soon as load_audio
tries to load a corrupted file, the program crashes. I realize a try and catch could work but I can not see how it solves my problem in this case.
What I ideally would like is to load all files that are not corrupted. I want the path of the files that are corrupted/throws an error when loading to go into an rdd/list of its own.
How can I achieve this? :)
Upvotes: 0
Views: 574
Reputation: 8067
Try/except solves your problem easily. We could use oldschool/Go mindset here and instead exceptions function would return error codes or something like that with the actual result.
I do not have your files to reproduce, so I would create similar error by using function that just checks path and throws exception when file is mp3. Let's say I have list of files:
import random
paths = [str(i) + '.' + random.choice(('mp3', 'ogg')) for i in xrange(100)]
std_audio_paths = '/'
def load_audio(path, sr=22050):
if path.endswith('mp3'):
raise IOError('Unknown format')
return 'audio data'
rdd = sc.parallelize(paths)
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# will break because load_audio throws exception
So I would use try/except to create wrapper around load_audio
that does not raise exception, but returns error:
def try_to_load_audio(path):
try:
return load_audio(path), None
except IOError as e:
return '', 'error'
audio = rdd.map(lambda path: (path,try_to_load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# returns collection of pairs (path, (result, error)):
# [('0.mp3', ('', 'error')), ('1.ogg', ('audio data', None)), ...
Now to the second part of your question. Instead of collect()
, do audio.cache()
, so you won't load files twice, and then:
failed = audio.filter(lambda f: f[1][1] == 'error').map(lambda f: f[0])
failed.collect()
# gives paths only of failed files (mp3 in my case):
# ['0.mp3', '3.mp3', ...
And to have only loaded audio data you do:
loaded_data = audio.filter(lambda f: f[1][1] is None).map(lambda f: f[1][0])
loaded_data.collect()
# Gives ['audio data', 'audio data',...
Upvotes: 2