MattLBeck
MattLBeck

Reputation: 5831

Piping large files using IPC::open2

I have made a perl script that wraps around another tool (overlapFeatures) so that I can correctly convert my file formats on the fly. The files I am dealing with are all tab-seperated tables of typically 2 million lines or so. On its own, overlapFeatures has no trouble dealing with these.

However I think I'm causing the pipes to lock up by piping so many lines at once. I know I need to somehow thread this so that I can read and write to the child process at the same time. However I really don't understand how to properly use threads in perl (or any other program for that matter). As I understand it I can use threads or even IPC::run to solve my problem.

My original script that ends up deadlocking goes something like this:

use strict;
use warnings;
use IPC::Open2;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
    # Do some format conversion...
    chomp
    my @cols = split /\t/;
    # print a modified line to the tool
    print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);

while (<$output>){
    # format conversion for ouput
    chomp;
    my @cols = split /\t/;
    print join (",",@cols[0,1,2,5,3,8]),"\n";
}
close ($output);

I attempted rewriting the script to make use of threads as per How to filter a lot of data with IPC::Open2? like so:

use strict;
use warnings;
use IPC::Open2;
use threads;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

my $thread = async {
    print join(",", qw(seqid start end strand read feature name)),"\n";
    for(;;) {
        my $line = <$output>; # should block here and wait for output?
        last if !defined $line; # end of stream reached?
        print STDERR "Got line $line\n";
        # Do some format conversion...
        chomp $line;
        my @cols = split /\t/, $line;
        # print a modified line to the tool
        print join(",",@cols[0,1,2,5,3,8]),"\n";
    }
    close($output)
};

{
    open (my $infh, '<', $infile) or die "Can't open $infile\n";
    while (<$infh>){
        # format conversion for ouput
        chomp;
        my @cols = split /\t/;
        print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
    }
    close ($input);
}

$thread->join();
waitpid ($pid, 0);

However, the script still gets stuck in the same way and I am stuck too. I also can't work out how to use IPC::run in this instance.

What am I doing wrong? Have I misunderstood threading?


EDIT: Spending more time debugging the script (and help from amon), I found that I am able to retrieve lines from $output. However, the script never finishes and appears to hang after all output is received. I think this is my only issue now.

Upvotes: 3

Views: 363

Answers (1)

amon
amon

Reputation: 57640

This is more like a long comment.

I tried your code in a stripped-down version. I removed the conversion code, used the Unix yes command as an infinite data source and printed the output to /dev/null, as we are currently not interested in the output, but in the program working. As a replacement for your overlapFeatures, I used cat to pass the data through unchanged.

use strict; use warnings; use IPC::Open2; use threads;

my $command = "cat";
my @args = ();

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
  or die "Failed with error $!\n";

my $thread = async {
  print $_ while defined($_ = <$output>);
  close($output)
};

{
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    if ($c >= 1_000_000) {
      print "\n==another million==\n\n";
      $c=0
    }
  }
  close ($input);
}

$thread->join();
waitpid ($pid, 0);

Once in a million lines (literally), I print a status message to assert that the IO is still working.

Result

Tested on Ubuntu Linux with Perl 12.4, the given script works flawlessly. It is therefore reasonable to assume that the problem lies not within the IPC code, but either in the data format conversion, the program you are wrapping, or the data mass (yes outputs the string "1\n", what makes for many lines of little data each. (~2MB per group in 2 bytes per line))

Conclusion

It could be that you are running a different configuration. If you are running a *nix, please assert that the script I used works for you as well. If not, please state this configuration explicitely and try running an equivalent script.

It would also be possible to split your wrapper into two scripts, at least for testing, so you would run something like

$ convert-to | overlapFeatures | convert-from

This delegates all the IPC to the shell and will assert that the conversions are working and that the architecture is implementable.

Other improbable ideas brainstormed:

(1) When is the close operation performed? Could it be that for some weird reason one end of the loops exits prematurely? a print STDERR "Closing down xx\n" before the closes might be interesting. (2) Do the open2 and async successfully spawn their processes / threads and return the control flow? Paranoid me would put another print STDERR after them… (3) Do you get any data out of your script, or does the stream dry out after some time?

Edit

A pipe doesn't yield EOF until all writing ends are closed. Therefore, all threads should close anything they aren't using:

my $thread = async {
  close $input;
  print $_ while defined($_ = <$output>);
  close($output)
};

and

{
  close $output;
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    ...

Upvotes: 1

Related Questions