Reputation: 35149
I'm having trouble to understand how to pass real data to the Spout, For example:
I have this two files (they are working fine):
#! /usr/bin/env python
import os, random, sys, time
for i in xrange(50):
print("%s\t%s"%(os.getpid(), i))
sys.stdout.flush()
time.sleep(random.randint(0,5))
And
#! /usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen,PIPE
p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
timeout = 0.1 # seconds
while p:
# remove finished processes from the list
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)
# wait until there is something to read
rlist = select([p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block
Now imagine that I want to pass those random lines to the spout for the future processing, I was trying this: from uuid import uuid4 from select import select from subprocess import Popen,PIPE import storm
class TwitterSpout(storm.Spout):
def initialize(self, conf, context):
self.pid = os.getpid()
try:
self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True, universal_newlines=True)
except OSError, e:
self.log('%s'%e)
sys.exit(1)
and than in nextTuple():
def nextTuple(self):
timeout = 0.1 # seconds
while self.p:
# remove finished processes from the list
if self.p.poll() is not None: # process ended
self.log ("%s"%self.p.stdout.read()) # read the rest
self.p.stdout.close()
processes.remove(self.p)
# wait until there is something to read
rlist = select([self.p.stdout], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
self.log ("%s%s"%f.readline()) #NOTE: it can block
msgId = random.randint(0,500)
self.log('MSG IN SPOUT %s\n'%msgId)
storm.emit([f.readline()], id=msgId)
But this structure doesn't work, I'm always getting error "Pipi seems to be broken..."
or if I try different variations of this code I am blocking the process, and Storm never riches the NextTuple. Please help me to solve my problem, or if someone can give me some example how to do similar thing, or just some advice.
Thank you
Upvotes: 4
Views: 616
Reputation: 414715
There could be multiple issues.
There is no break in the while
loop -- infinite loop.
You call f.readline()
twice. You probably meant to call it only once after each select
.
To avoid blocking, use data = os.read(f.fileno(), 1024)
after select
.
I don't know whether it is acceptable to block nextTuple()
until the child process exits.
If all you do is read lines from the subprocess then you don't need the select
:
def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')):
p = Popen(args, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL,
bufsize=1, close_fds=True)
for line in iter(p.stdout.readline, b''): # expect b'\n' newline
yield line
p.stdout.close()
raise StopIteration(p.wait())
Example:
# ...
self.lines = iter_lines(sys.executable, '-u', 'rand_lines.py')
#...
def nextTuple(self):
try:
line = next(self.lines).decode('ascii', 'ignore')
except StopIteration as e:
self.exit_status = e.args[0]
else:
storm.emit([line.strip()])
Upvotes: 1