Reputation: 4797
I have a program that can be started or stopped at any moment. The program is used to download data from web pages. First, a user will define a bunch of web pages in a .csv
file, then save that .csv
file, then start the program. That program will read the .csv
file and turn it into a list of jobs. Next, the jobs are split among 5 separate downloader
functions that work in parallel but may take different times to download.
After a downloader
(which there are 5 of) finished downloading a web page, I need it to open the .csv
file and remove the link. This way, as time passes, the .csv
file will get smaller and smaller. The issue is that sometimes two download
functions will try to update the .csv
file at the same time and will cause the program to crash. How can I deal with this?
Upvotes: 5
Views: 6016
Reputation: 25769
If this is a continuation of your project from yesterday you already have your download list in memory - just remove the entries from the loaded list as their processes finish download and only write down the whole list over the input file once you're exiting the 'downloader'. There is no reason to constantly write down the changes.
If you want to know (say from an external process) when a url gets downloaded even while your 'downloader' is running, write in a downloaded.dat
a new line each time a process returns that download was successful.
Of course, in both cases, write from within your main process/thread so you don't have to worry about mutex.
UPDATE - Here's how to do it with an additional file, using the same code base as yesterday:
def init_downloader(params): # our downloader initializator
downloader = Downloader(**params[0]) # instantiate our downloader
downloader.run(params[1]) # run our downloader
return params # job finished, return the same params for identification
if __name__ == "__main__": # important protection for cross-platform use
downloader_params = [ # Downloaders will be initialized using these params
{"port_number": 7751},
{"port_number": 7851},
{"port_number": 7951}
]
downloader_cycle = cycle(downloader_params) # use a cycle for round-robin distribution
with open("downloaded_links.dat", "a+") as diff_file: # open your diff file
diff_file.seek(0) # rewind the diff file to the beginning to capture all lines
diff_links = {row.strip() for row in diff_file} # load downloaded links into a set
with open("input_links.dat", "r+") as input_file: # open your input file
available_links = []
download_jobs = [] # store our downloader parameters + a link here
# read our file line by line and filter out downloaded links
for row in input_file: # loop through our file
link = row.strip() # remove the extra whitespace to get the link
if link not in diff_links: # make sure link is not already downloaded
available_links.append(row)
download_jobs.append([next(downloader_cycle), link])
input_file.seek(0) # rewind our input file
input_file.truncate() # clear out the input file
input_file.writelines(available_links) # store back the available links
diff_file.seek(0) # rewind the diff file
diff_file.truncate() # blank out the diff file now that the input is updated
# and now let's get to business...
if download_jobs:
download_pool = Pool(processes=5) # make our pool use 5 processes
# run asynchronously so we can capture results as soon as they ar available
for response in download_pool.imap_unordered(init_downloader, download_jobs):
# since it returns the same parameters, the second item is a link
# add the link to our `diff` file so it doesn't get downloaded again
diff_file.write(response[1] + "\n")
else:
print("Nothing left to download...")
The whole idea is, as I wrote in the comment, to use a file to store downloaded links as they get downloaded, and then on the next run to filter out the downloaded links and update the input file. That way even if you forcibly kill it, it will always resume where it left off (except for the partial downloads).
Upvotes: 4
Reputation: 519
Use a 'Lock' from the multiprocessing library to serialize operations with the file.
You will want to pass the lock into each process. Each process should 'acquire' the lock before it opens the file and 'release' the lock after it closes the file.
https://docs.python.org/2/library/multiprocessing.html
Upvotes: 1
Reputation: 360
Look into locking files in python. Locking a file will make the next process wait until the file is unlocked to modify it. Locking files is platform specific so you will have to use whichever method works for the os you are on. If you need to figure out the os use a switch statement like this.
import os
def my_lock(f):
if os.name == "posix":
# Unix or OS X specific locking here
elif os.name == "nt":
# Windows specific locking here
else:
print "Unknown operating system, lock unavailable"
Then I would look at this article and figure out exactly how you want to implement your lock.
Upvotes: 0