Reputation: 1445
this is my code
class MusicHandler(object):
""" Implements the logic to download musics """
def __init__(self):
""" Creates the class object, loads configs, absolute paths and create directories. Does not init download """
# create logger
self.logger = getLogger("music logger")
self.logger.info("Initializing MusicHandler class object")
# load config vars
self.configs = utils.get_configs()
# load absolute paths
self.dir_music = self.configs["music_path"]
self.dir_audio = utils.get_path_from_rel(self.configs["audio_path"], base_path=self.dir_music)
self.dir_video = utils.get_path_from_rel(self.configs["video_path"], base_path=self.dir_music)
self.taboo_path = utils.get_path_from_rel(self.configs["taboo_path"])
# make dir if not exists
list(map(utils.make_dir_safe, (self.dir_music, self.dir_audio, self.dir_video)))
@run_safe
def download_one(self, link, keep_video=False):
""" handles the download of one link. keep_video determines if video is saved or deleted. """
self.logger.info(f"Starting download of {link}")
if self.is_taboo(link):
self.logger.warning("Link is taboo, will skip it.")
return
name = self.download_video(link)
self.logger.info(f"Download of {name} (video) was a success.")
self.video_to_audio(name)
self.add_to_taboo(link)
if not keep_video:
self.remove_video(name)
self.logger.critical(f"Successfully downloaded {name}. Available at {self.dir_music}.")
def get_musics_linear(self, url_list):
""" Linear download of all links from list """
for link in url_list:
self.download_one(link)
@run_safe
def get_musics_parallel(self, url_list, num_procs=3):
""" Parallel download of all links from list """
with Pool(num_procs) as p:
print("here")
p.map(self.download_one, url_list)
print("there")
def is_taboo(self, link):
""" Verifies if link exists in the taboo file """
return utils.is_in_file(self.taboo_path, link)
def add_to_taboo(self, link):
""" Adds link to the taboo file """
utils.append_to_file(self.taboo_path, link)
def download_video(self, link):
""" Downloads the highest resolution video given a link. Returns file name """
yt = YouTube(link)
yt.streams.filter(progressive=True).order_by("resolution").desc().first().download(self.dir_video, yt.title)
return utils.safe_filename(yt.title)
def download_audio(self, link):
""" Downloads only the audio of a given link. Returns file name """
yt = YouTube(link)
yt.streams.filter(only_audio=True).first().download(self.dir_audio, yt.title)
return utils.safe_filename(yt.title)
def video_to_audio(self, name):
""" Converts a video to an audio file """
self.logger.info("Converting video to audio.")
video_f_path = os.path.join(self.dir_video, "{}{}".format(name, ".mp4"))
audio_f_path = os.path.join(self.dir_audio, "{}{}".format(name, ".wav"))
cmd_list = ["ffmpeg", "-i", video_f_path, "-f", "wav", "-ab", "19200", "-vn", audio_f_path]
FNULL = open(os.devnull, 'w')
subprocess.call(cmd_list, stdout=FNULL, stderr=subprocess.STDOUT)
def remove_video(self, name):
""" Deletes a video file """
self.logger.info("Removing video file.")
video_f_path = os.path.join(self.dir_video, "{}{}".format(name, ".mp4"))
if os.path.exists(video_f_path):
os.remove(video_f_path)
when running this code, I get the following error:
File "C:\Users\Y\PycharmProjects\pyMusic\lib\pyMusic.py", line 143, in get_musics_parallel p.map(self.download_one, url_list) File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\multiprocessing\pool.py", line 266, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\multiprocessing\pool.py", line 644, in get raise self._value File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\multiprocessing\pool.py", line 424, in _handle_tasks put(task) File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\multiprocessing\connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "C:\Program Files (x86)\Microsoft Visual Studio\Shared\Python36_64\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle _thread.RLock objects
However, if I remove the logger, the code executes without any problem.
Any idea about why the logger is causing this pickle error? The error is on the "get_musics_parallel" function, and occurs on the map(self.download_one, urllist) line.
I tried to google this error, but the questions relative to this pickle error seem not the be related to my problem.
I appreciate your help, Kind regards
Upvotes: 7
Views: 6956
Reputation: 3
I ran into this same error running multiprocessing on Python 3.6.7. When the the logger was removed from the args list, it worked.
Upvotes: 0
Reputation: 1059
I find the same error, but the same code, on a different machine, raises no error.
The difference is: the machine where the code is not working is using python3.6.7 The machine where the same code is working is using python3.7.6
So my answer is: if possible, upgrade to python 3.7.6
Not sure why it works though.
Upvotes: 0
Reputation: 9
I came across this problem when i ran a pyspark job where i use logging module to record the information during the task.
The spark log shows such info"_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects"
Spark will cast the variable to all the workers before computing, however logging object can't be casted which induced such error.
After i remove all the logging usage, the error disappeared.
Wish this tip helps you.
Upvotes: 0
Reputation: 16700
Aren't you using multiprocessing
module somewhere? It requires that data exchanged between threads/processes be pickable.
The logger uses RLock
which is holding the state of the process and as such cannot be pickled. Logger uses it so that it can do the magic of not messing up with the log file (or any other logging output) when there are multiple loggers acquired (or the same one) throughout the script.
Upvotes: 7