Reputation: 2697
I had asked a question a few weeks ago about implementation of a non-blocking one parent-many child pipe, which was ably answered by @mob here
However, I've noticed that if a child posts more than one message before exiting, the parent only gets the first one if it reads a little later.
Example code:
use IO::Handle;
use POSIX ':sys_wait_h';
pipe(READER,WRITER);
WRITER->autoflush(1);
sub child_process {
close READER; # also a best but optional practice
srand($$);
my $id = 0;
sleep 1 + 5*rand();
$id++;
print "Child Pid $$ sending message $id now...\n";
print WRITER "$id:Child Pid $$ is sending this - Message 1\n";
print WRITER "$id:Child Pid $$ is sending this - Message 2\n";
exit 0;
}
if (fork() == 0) {
child_process();
}
# parent
my ($rin,$rout) = ('');
vec($rin,fileno(READER),1) = 1;
while (1) {
# non-blocking read on pipe
my $read_avail = select($rout=$rin, undef, undef, 0.0);
if ($read_avail < 0) {
if (!$!{EINTR}) {
warn "READ ERROR: $read_avail $!\n";
last;
}
} elsif ($read_avail > 0) {
chomp(my $line = <READER>);
print "Parent Got $$: '$line'\n";
} else {
print STDERR "No input ... do other stuff\n";
}
sleep 5;
}
close WRITER; # now it is safe to do this ...
Expected output:
I should get both messages.
What I get: Only the first message
No input ... do other stuff
No input ... do other stuff
Child Pid 8594 sending message 1 now...
Parent Got 8593: '1:Child Pid 8594 is sending this - Message 1'
No input ... do other stuff
This is supposed to be a non-blocking read, so home come it can't pick up the data on the next iteration? Is it because the child exited? I tried doing a while (chomp(my $line = <READER>))
in the parent, but that blocks, which I can't have.
Upvotes: 3
Views: 691
Reputation: 118635
Looks like you are mixing buffered and unbuffered I/O.
<READER>
(and readline(READER)
) are buffered input operations. The first time you call readline
on a file handle, Perl will try to read up to 8K of data from the handle, saving most of it in a memory buffer. The next time you call readline
on the same file handle, Perl will try to return data in the buffer before it tries to read more data from the file again. This is for efficiency.
select
is an operation for unbuffered I/O. It tells you whether input is waiting on the file handle itself, but it can not see whether data is waiting in the buffer.
A clunky alternative would be to use sysread
or getc
to extract data from the pipe. That is inconvenient because you would have to break the input into separate lines yourself.
... if ($read_avail > 0) {
my $n = sysread READER, my $lines, 16384;
chomp($lines);
my @lines = split /\n/, $lines;
print "Parent Got $$: '$_'\n" for @lines;
} ...
What might work is to read from the file handle in list context.
chomp(my @lines = <READER>);
seek READER, 0, 1;
should read all available data from both the buffer and the filehandle, and in theory it will leave your buffer empty so the next <READER>
call will be like an unbuffered read. (The seek
statement clears the EOF condition on the filehandle so you can read from the filehandle later when more input arrives).
(ETA: nah, that won't work. That will just block on READER
until the child closes its end of the pipe)
The docs for select
have this warning
WARNING: One should not attempt to mix buffered I/O (like "
read
" or<FH>
) with "select
", except as permitted by POSIX, and even then only on POSIX systems. You have to use "sysread
" instead.
Upvotes: 3
Reputation: 2697
Ok, I seem to see the benefit of @Grinnz's first recommendation to use a well defined framework. I thought I needed a tricycle but it looks like I'm slowly constructing a BMW from nuts and bolts.
@mob and @grinnz's suggestions were right. It was a case of buffer/vs/non buffer.
chomp(my @lines = <READER>);
seek READER, 0, 1;
does not work. It locks.
This cookbook recipe works, but I'll tweak it/test it further tomorrow (source). So far so good:
use IO::Handle;
use POSIX ':sys_wait_h';
use Symbol qw(qualify_to_ref);
use IO::Select;
pipe(READER,WRITER);
WRITER->autoflush(1);
sub sysreadline(*;$) {
my($handle, $timeout) = @_;
$handle = qualify_to_ref($handle, caller( ));
my $infinitely_patient = (@_ == 1 || $timeout < 0);
my $start_time = time( );
my $selector = IO::Select->new( );
$selector->add($handle);
my $line = "";
SLEEP:
until (at_eol($line)) {
unless ($infinitely_patient) {
return $line if time( ) > ($start_time + $timeout);
}
# sleep only 1 second before checking again
next SLEEP unless $selector->can_read(1.0);
INPUT_READY:
while ($selector->can_read(0.0)) {
my $was_blocking = $handle->blocking(0);
CHAR: while (sysread($handle, my $nextbyte, 1)) {
$line .= $nextbyte;
last CHAR if $nextbyte eq "\n";
}
$handle->blocking($was_blocking);
# if incomplete line, keep trying
next SLEEP unless at_eol($line);
last INPUT_READY;
}
}
return $line;
}
sub at_eol($) { $_[0] =~ /\n\z/ }
sub child_process {
close READER; # also a best but optional practice
srand($$);
my $id = 0;
sleep 1 + 5*rand();
$id++;
print "Child Pid $$ sending message $id now...\n";
print WRITER "$id:Child Pid $$ is sending this - Message 1\n";
print WRITER "$id:Child Pid $$ is sending this - Message 2\n";
exit 0;
}
if (fork() == 0) {
child_process();
}
# parent
my ($rin,$rout) = ('');
vec($rin,fileno(READER),1) = 1;
while (1) {
# non-blocking read on pipe
while ((my $read_avail = select($rout=$rin, undef, undef, 0.0)) !=0)
{
if ($read_avail < 0) {
if (!$!{EINTR}) {
warn "READ ERROR: $read_avail $!\n";
last;
}
}
elsif ($read_avail > 0) {
chomp(my $line = sysreadline(READER));
print "Parent Got $$: '$line'\n";
print "END MESSAGE\n";
}
}
print STDERR "input queue empty...\n";
print "Sleeping main for 5...\n";
sleep 5;
}
close WRITER; # now it is safe to do this ...
Upvotes: 1
Reputation: 9231
You are only reading up to one line per iteration, rather than reading all of the available data on the pipe. Perhaps select() is not indicating it's readable anymore after that. Note that since you're forking you also need to reap the subprocess with waitpid after it exits (in blocking mode waitpid will wait for it to exit), this will return the exit code of the child process.
I recommend using an event loop to manage pipes between processes as it and its helper modules will manage all of the weird details of both forking a process and exchanging data. Here's what something using IO::Async might look like.
use strict;
use warnings;
use IO::Async::Loop;
use IO::Async::Channel;
use IO::Async::Routine;
my $channel = IO::Async::Channel->new;
sub child_process {
my $id = 0;
sleep 1 + 5*rand();
$id++;
print "Child Pid $$ sending message $id now...\n";
$channel->send(\"$id:Child Pid $$ is sending this - Message 1\n");
$channel->send(\"$id:Child Pid $$ is sending this - Message 2\n");
}
my $loop = IO::Async::Loop->new;
my $f = $loop->new_future;
my $routine = IO::Async::Routine->new(
channels_out => [$channel],
code => \&child_process,
on_return => sub { my $routine = shift; $f->done(@_) },
on_die => sub { my $routine = shift; $f->fail(@_) },
);
$loop->add($routine);
$channel->configure(on_recv => sub {
my ($channel, $ref) = @_;
print "Parent Got: '$$ref'\n";
});
# wait for Future to complete (process finishes) or fail (process fails to start or dies)
my $exitcode = $f->get;
print "Child exited with exit code $exitcode\n";
Note that IO::Async::Channel is just an abstraction around IO::Async::Stream for sending data structures between processes, and IO::Async::Routine is an abstraction around IO::Async::Process (or a thread, on Windows systems) for setting up Channels to the forked code. IO::Async::Function additionally is a higher-level wrapper of IO::Async::Routine that can manage a fork/thread pool to run a subroutine many times with different input and receive the return values in the parent. So there are many levels you can make use of depending how deep you want to dive.
Upvotes: 2