Wolfgang Kerzendorf
Wolfgang Kerzendorf

Reputation: 734

Scheduling very large amounts of jobs on python luigi

I've written a Luigi pipeline to extract 1.2 mio files and then do some sed work on them - see https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f.

If I run this through Luigi on a few thousand files it schedules fine. But running this on the whole dataset it complains with Failed connecting to remote scheduler. Not sure if I do this the right way.

Upvotes: 3

Views: 1458

Answers (1)

MattMcKnight
MattMcKnight

Reputation: 8290

I would advise against creating a separate task for each file when there are going to be more than 1k. You will probably have better luck creating a batch task that runs on a directory of those files. This task can then use multiprocessing to exploit parallel calls to your processing function.

from multiprocessing import Pool, cpu_count
import os

class TestTask(luigi.WrapperTask):
    inglob = luigi.Parameter(default='/1002/*.gz')
    outdir = luigi.Parameter(default='/1002-out/')
    tmpdir = luigi.Parameter(default='/1002-tmp/'

    def extract_file(filename):
        # extract file to self.tempdir not shown

    def output(self):
        return luigi.LocalTarget(self.outdir)

    def run(self):
        os.makedirs(self.tempdir)
        p = Pool(cpu_count())
        p.map(extract_file, glob(self.inglob))
        os.rename(self.tempdir, self.outdir)

Upvotes: 1

Related Questions