Reputation: 734
I've written a Luigi pipeline to extract 1.2 mio files and then do some sed work on them - see https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f.
If I run this through Luigi on a few thousand files it schedules fine. But running this on the whole dataset it complains with Failed connecting to remote scheduler
. Not sure if I do this the right way.
Upvotes: 3
Views: 1458
Reputation: 8290
I would advise against creating a separate task for each file when there are going to be more than 1k. You will probably have better luck creating a batch task that runs on a directory of those files. This task can then use multiprocessing to exploit parallel calls to your processing function.
from multiprocessing import Pool, cpu_count
import os
class TestTask(luigi.WrapperTask):
inglob = luigi.Parameter(default='/1002/*.gz')
outdir = luigi.Parameter(default='/1002-out/')
tmpdir = luigi.Parameter(default='/1002-tmp/'
def extract_file(filename):
# extract file to self.tempdir not shown
def output(self):
return luigi.LocalTarget(self.outdir)
def run(self):
os.makedirs(self.tempdir)
p = Pool(cpu_count())
p.map(extract_file, glob(self.inglob))
os.rename(self.tempdir, self.outdir)
Upvotes: 1