Thomas Kimber
Thomas Kimber

Reputation: 11087

Chaining Nested Yields

I want to build a data pipeline that performs a series of operations on rows of a data series.

Most functions will work on a one-row-in, one-row-out basis, but some of those operations will "expand" the series - by which I mean one row will go into the function and more than one row may be generated as a result of that function.

I want to setup a chain of functions that are robust enough to handle this behavior themselves without having to write a bunch of oversight code.

Using yield occurred as presenting an opportunity - if each function consumed a yield from a previous function, and acted as a generator themselves, then I can arbitrarily chain together a bunch of these well-formed functions - which would be nice from an elegance point of view.

Here's my setup code with func_x acting as a simple 1-1 function, and func_y doing an expansion.

from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
            OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
            OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
            OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
            OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
            OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
            OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
            OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
    for content in source:
        yield content.copy()

def func_x(row):
    try:
        q=next(row)
        if q['name']=="Tom":
            q['name']="Richard"
        yield q.copy()
    except StopIteration:
        print ("Stop x")


def func_y(row):
    try:
        q=next(row)
        for thingy in range(0,2):
            q['thingy']=thingy
            yield q.copy()
    except StopIteration:
        print ("Stop y")

rg = row_getter(data_source)
iter_func = func_y(func_x(rg))

Now, I can get the first set of data by iterating through the iter_func object:

print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])

And again:

print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])

And again, though this time, instead of seeing the record for Steve (i.e. the next record in the flow, now the expansion of func_y on the first record is complete) I get a StopIteration error.

print (next(iter_func))
>> StopIteration                             Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))

StopIteration: 

So I don't understand where this is coming from, since I've attempted to trap these in both func_x and func_y.

Upvotes: 1

Views: 309

Answers (2)

chepner
chepner

Reputation: 531708

Built-in tools (specifically map and itertools.chain) can do this for you.

from collections import OrderedDict
from itertools import chain


data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
            OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
            OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
            OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
            OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
            OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
            OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
            OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]


def rename(d):
    if d['name'] == "Tom":
        d['name'] = "Richard"
    return d


def add_thingy(d):
    for y in range(2):
        yield {'thingy': y, **d}

for x in chain.from_iterable(add_thingy(d) 
                             for d in map(rename,
                                          data_source)):
    print(x)

map isn't really necessary; we can apply rename to each dict before we pass it to add_thingy in the generator expression.

for x in chain.from_iterable(add_thingy(rename(d)) for d in data_source):
    print(x)

or go the other way and use map twice:

for x in chain.from_iterable(map(add_thingy, map(rename, data_source))):
    print(x)

Upvotes: 1

javidcf
javidcf

Reputation: 59731

Your func_x function only yields one item, so it finalizes after that one is consumed. Try with something like this:

def func_x(row):
    try:
        for q in row:
            if q['name']=="Tom":
                q['name']="Richard"
            yield q
    except StopIteration:
        print ("Stop x")

By the way, note that each yielding does not make a copy of the object. This may be fine in many cases, but note that in func_y you are yielding each same object twice, setting 'thingy' to a different value. This means that if for example you do (after the code you posted):

d1 = next(iter_func)
d2 = next(iter_func)

d1 and d2 will be the same object, and in particular they will both have 'thingy' set to 1.

Upvotes: 1

Related Questions