Vor
Vor

Reputation: 35149

Pass real data to the Storms Spout using Non-JVM language in Twitter-Storm

I'm having trouble to understand how to pass real data to the Spout, For example:

I have this two files (they are working fine):

#! /usr/bin/env python

import os, random, sys, time

for i in xrange(50):
    print("%s\t%s"%(os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.randint(0,5))

And

#! /usr/bin/env python

from __future__ import print_function
from select import select
from subprocess import Popen,PIPE

p = Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True) 

timeout = 0.1 # seconds
while p:
    # remove finished processes from the list 
    if p.poll() is not None: # process ended
        print(p.stdout.read(), end='') # read the rest
        p.stdout.close()
        processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

Now imagine that I want to pass those random lines to the spout for the future processing, I was trying this: from uuid import uuid4 from select import select from subprocess import Popen,PIPE import storm

class TwitterSpout(storm.Spout):

    def initialize(self, conf, context):
        self.pid = os.getpid()
        try:
            self.p= Popen(['./rand_lines.py'], stdout=PIPE, bufsize=1, close_fds=True,  universal_newlines=True)
        except OSError, e:
            self.log('%s'%e)
            sys.exit(1)

and than in nextTuple():

def nextTuple(self):
    timeout = 0.1 # seconds
    while self.p:
        # remove finished processes from the list 
        if self.p.poll() is not None: # process ended
        self.log ("%s"%self.p.stdout.read()) # read the rest
        self.p.stdout.close()
        processes.remove(self.p)

        # wait until there is something to read
        rlist = select([self.p.stdout], [],[], timeout)[0]

        # read a line from each process that has output ready
        for f in rlist:
        self.log ("%s%s"%f.readline()) #NOTE: it can block
        msgId = random.randint(0,500)
        self.log('MSG IN SPOUT %s\n'%msgId)
        storm.emit([f.readline()], id=msgId)

But this structure doesn't work, I'm always getting error "Pipi seems to be broken..." or if I try different variations of this code I am blocking the process, and Storm never riches the NextTuple. Please help me to solve my problem, or if someone can give me some example how to do similar thing, or just some advice. Thank you

Upvotes: 4

Views: 616

Answers (1)

jfs
jfs

Reputation: 414715

There could be multiple issues.

There is no break in the while loop -- infinite loop.

You call f.readline() twice. You probably meant to call it only once after each select.

To avoid blocking, use data = os.read(f.fileno(), 1024) after select.

I don't know whether it is acceptable to block nextTuple() until the child process exits.

If all you do is read lines from the subprocess then you don't need the select:

def iter_lines(*args, DEVNULL=open(os.devnull, 'r+')):
    p = Popen(args, stdin=DEVNULL, stdout=PIPE, stderr=DEVNULL,
              bufsize=1, close_fds=True)
    for line in iter(p.stdout.readline, b''): # expect b'\n' newline
        yield line
    p.stdout.close()
    raise StopIteration(p.wait())

Example:

# ...
self.lines = iter_lines(sys.executable, '-u', 'rand_lines.py')

#...
def nextTuple(self):
    try:
        line = next(self.lines).decode('ascii', 'ignore')
    except StopIteration as e:
        self.exit_status = e.args[0]
    else:
        storm.emit([line.strip()])

Upvotes: 1

Related Questions