Carl Rynegardh
Carl Rynegardh

Reputation: 558

PySpark - Loading and saving path to files, some corrupt

I'm rather new to PySpark and I am trying to load, and preprocess some data. However, some of the data seems to corrupted and I need to handle these errors, together with finding out which of the files that are corrupt.

def load_audio(path, sr = 22050):
    return librosa.load(path,sr=sr)

rdd = sc.parallelize(paths) #Path to all files
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path)))) # Loads all the audio files

As soon as load_audio tries to load a corrupted file, the program crashes. I realize a try and catch could work but I can not see how it solves my problem in this case.

What I ideally would like is to load all files that are not corrupted. I want the path of the files that are corrupted/throws an error when loading to go into an rdd/list of its own.

How can I achieve this? :)

Upvotes: 0

Views: 574

Answers (1)

Bunyk
Bunyk

Reputation: 8067

Try/except solves your problem easily. We could use oldschool/Go mindset here and instead exceptions function would return error codes or something like that with the actual result.

I do not have your files to reproduce, so I would create similar error by using function that just checks path and throws exception when file is mp3. Let's say I have list of files:

import random
paths = [str(i) + '.' + random.choice(('mp3', 'ogg')) for i in xrange(100)]
std_audio_paths = '/'

def load_audio(path, sr=22050):
    if path.endswith('mp3'):
        raise IOError('Unknown format')
    return 'audio data'




rdd = sc.parallelize(paths)
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# will break because load_audio throws exception

So I would use try/except to create wrapper around load_audio that does not raise exception, but returns error:

def try_to_load_audio(path):
    try:
        return load_audio(path), None
    except IOError as e:
        return '', 'error'

audio = rdd.map(lambda path: (path,try_to_load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# returns collection of pairs  (path, (result, error)):
# [('0.mp3', ('', 'error')), ('1.ogg', ('audio data', None)), ...

Now to the second part of your question. Instead of collect(), do audio.cache(), so you won't load files twice, and then:

failed = audio.filter(lambda f: f[1][1] == 'error').map(lambda f: f[0])
failed.collect()
# gives paths only of failed files (mp3 in my case):
# ['0.mp3', '3.mp3', ...

And to have only loaded audio data you do:

loaded_data = audio.filter(lambda f: f[1][1] is None).map(lambda f: f[1][0])
loaded_data.collect()
# Gives ['audio data', 'audio data',...

Upvotes: 2

Related Questions