Reputation: 11087
I want to build a data pipeline that performs a series of operations on rows of a data series.
Most functions will work on a one-row-in, one-row-out basis, but some of those operations will "expand" the series - by which I mean one row will go into the function and more than one row may be generated as a result of that function.
I want to setup a chain of functions that are robust enough to handle this behavior themselves without having to write a bunch of oversight code.
Using yield
occurred as presenting an opportunity - if each function consumed a yield from a previous function, and acted as a generator themselves, then I can arbitrarily chain together a bunch of these well-formed functions - which would be nice from an elegance point of view.
Here's my setup code with func_x
acting as a simple 1-1 function, and func_y
doing an expansion.
from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
for content in source:
yield content.copy()
def func_x(row):
try:
q=next(row)
if q['name']=="Tom":
q['name']="Richard"
yield q.copy()
except StopIteration:
print ("Stop x")
def func_y(row):
try:
q=next(row)
for thingy in range(0,2):
q['thingy']=thingy
yield q.copy()
except StopIteration:
print ("Stop y")
rg = row_getter(data_source)
iter_func = func_y(func_x(rg))
Now, I can get the first set of data by iterating through the iter_func object:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])
And again:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])
And again, though this time, instead of seeing the record for Steve (i.e. the next record in the flow, now the expansion of func_y on the first record is complete) I get a StopIteration
error.
print (next(iter_func))
>> StopIteration Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))
StopIteration:
So I don't understand where this is coming from, since I've attempted to trap these in both func_x
and func_y
.
Upvotes: 1
Views: 309
Reputation: 531708
Built-in tools (specifically map
and itertools.chain
) can do this for you.
from collections import OrderedDict
from itertools import chain
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def rename(d):
if d['name'] == "Tom":
d['name'] = "Richard"
return d
def add_thingy(d):
for y in range(2):
yield {'thingy': y, **d}
for x in chain.from_iterable(add_thingy(d)
for d in map(rename,
data_source)):
print(x)
map
isn't really necessary; we can apply rename
to each dict
before we pass it to add_thingy
in the generator expression.
for x in chain.from_iterable(add_thingy(rename(d)) for d in data_source):
print(x)
or go the other way and use map
twice:
for x in chain.from_iterable(map(add_thingy, map(rename, data_source))):
print(x)
Upvotes: 1
Reputation: 59731
Your func_x
function only yields one item, so it finalizes after that one is consumed. Try with something like this:
def func_x(row):
try:
for q in row:
if q['name']=="Tom":
q['name']="Richard"
yield q
except StopIteration:
print ("Stop x")
By the way, note that each yielding does not make a copy of the object. This may be fine in many cases, but note that in func_y
you are yielding each same object twice, setting 'thingy'
to a different value. This means that if for example you do (after the code you posted):
d1 = next(iter_func)
d2 = next(iter_func)
d1
and d2
will be the same object, and in particular they will both have 'thingy'
set to 1
.
Upvotes: 1