Reputation: 8362
I am following the PyData talk in https://youtu.be/R1em4C0oXo8, the presenter whows a library for pipeling call yamal
. This library is not open source. So, In my way of learning FP in python
, I tried to replicate the basics of that library.
In a nutshell, you build a series of pure functions in python
(f1
, f2
, f3
, etc) , and create a list of them as follows:
pipeline = [f1, f2, f3, f4]
Then, you can apply the function run_pipeline
, and the result will be the composition:
f4(f3(f2(f1)))
The requirements to the functions are that all have one return value, and except f1
, all have one input.
This part is easy to implement I had it done using a composing the functions.
def run_pipeline(pipeline):
get_data, *rest_of_steps = steps
def compose(x):
for f in rest_of_steps:
y = f(x)
x = y
return x
data = get_data()
return compose(data)
The talk show a more advance use of the this abstraction, he defines the "operators" fork
and reducer
. This "operators" allow to run pipelines as the following:
pipeline1 = [ f1, fork(f2, f3), f4 ]
which is equivalent to: [ f4(f2(f1)), f4(f3(f1)) ]
and
pipeline2 = [ f1, fork(f2, f3), f4, reducer(f5) ]
which is equivalent to f5([f4(f3(f1)), f4(f2(f1))])
.
I try to resolve this using functional programming, but I simply can't. I don't know if fork
and reducer
are decorators
(and if so How do I pass the list of following functions?) don't know if I should transform this list to a graph using objects? coroutines? (maybe all of this is nonsense) I simply utterly confused.
Could someone help me about how to frame this using python
and functional programming?
NOTE: In the video he talks about observers or executors. for this exercise I don't care about them.
Upvotes: 4
Views: 2324
Reputation: 9868
Although this library is intended to facilitate FP in Python, it's not clear whether the library itself should be written using lots of FP.
This is one way to implement using classes (based on the list
type) to tell the pipe
function whether it needs to fork or reduce, and whether it is dealing with a single data item or a list of items.
This makes some limited use of FP style techniques such as the recursive calls to apply_func
(allowing multiple forks within a pipeline).
class Forked(list):
""" Contains a list of data after forking """
class Fork(list):
""" Contains a list of functions for forking """
class Reducer(object):
""" Contains a function for reducing forked data """
def __init__(self, func):
self.func = func
def fork(*funcs):
return Fork(funcs)
def reducer(func):
""" Return a reducer form based on a function that accepts a
Forked list as its first argument """
return Reducer(func)
def apply_func(data, func):
""" Apply a function to data which may be forked """
if isinstance(data, Forked):
return Forked(apply_func(datum, func) for datum in data)
else:
return func(data)
def apply_form(data, form):
""" Apply a pipeline form (which may be a function, fork, or reducer)
to the data """
if callable(form):
return apply_func(data, form)
elif isinstance(form, Fork):
return Forked(apply_func(data, func) for func in form)
elif isinstance(form, Reducer):
return form.func(data)
def pipe(data, *forms):
""" Apply a pipeline of function forms to data """
return reduce(apply_form, forms, data)
Examples of this in use:
def double(x): return x * 2
def inc(x): return x + 1
def dec(x): return x - 1
def mult(L): return L[0] * L[1]
print pipe(10, inc, double) # 21
print pipe(10, fork(dec, inc), double) # [18, 22]
print pipe(10, fork(dec, inc), double, reducer(mult)) # 396
EDIT: This can also be simplified a bit further by making fork
a function that returns a function and reducer
a class that creates objects mimicking a function. Then the separate Fork
and Reducer
classes are no longer needed.
class Forked(list):
""" Contains a list of data after forking """
def fork(*funcs):
""" Return a function that will take data and output a forked
list of results of putting the data through several functions """
def inner(data):
return Forked(apply_form(data, func) for func in funcs)
return inner
class reducer(object):
def __init__(self, func):
self.func = func
def __call__(self, data):
return self.func(data)
def apply_form(data, form):
""" Apply a function or reducer to data which may be forked """
if isinstance(data, Forked) and not isinstance(form, reducer):
return Forked(apply_form(datum, form) for datum in data)
else:
return form(data)
def pipe(data, *forms):
""" Apply a pipeline of function forms to data """
return reduce(apply_form, forms, data)
Upvotes: 3