Pipelining in Python

Kiuhnm kiuhnm03.4t.yahoo.it
Fri Mar 30 07:27:34 EDT 2012


I decided to withdraw my proposal for streaming programming :) and to fall back to something more conventional.
Here's the full story:
<http://mtomassoli.wordpress.com/2012/03/29/pipelining-in-python/>

The new operators are
  '>>'         which does the pipelining
and
  '-'          which links functions
Pipelining is "function application in reverse order" and linking is "function composition in reverse order".

Therefore,
  arg >> f
means
  f(arg)
and
  arg >> f - g
means
  g(f(arg))

Let's look at some examples:

--->
1)
range(0,50) >> filter(lambda i : i%2) >> map(lambda i : i*i) >> my_print

(Yes, that's currying.)

2)
compFunc = filter(lambda i : i%2) - map(lambda i : i*i)
range(0,50) >> compFunc >> my_print

3) (Sieve of Eratosthenes)
# Tells whether x is not a proper multiple of n.
notPropMult = cur(lambda n, x : x <= n or x % n, 2)

def findPrimes(upTo):
    if (upTo <= 5): return [2, 3, 5]
    filterAll = (findPrimes(floor(sqrt(upTo)))
                 >> map(lambda x : filter(notPropMult(x)))
                 >> reduce(lambda f, g : f - g))
    return list(range(2, upTo + 1)) >> filterAll

findPrimes(1000) >> my_print

4) (Finds the approximate number of hrefs in a web page)

def do(proc, arg):
   proc()
   return arg
do = cur(do)

cprint = cur(print)

("http://python.org"
 >> do(cprint("The page http://python.org has about... ", end = ''))
 >> do(sys.stdout.flush)
 >> urlopen
 >> cur(lambda x : x.read())
 >> findall(b"href=\"")
 >> cur(len)
 >> cur("{} hrefs.".format)
 >> cprint)
<---

And here's the complete source code (which includes a class version of /cur/ and the old tests/examples):
--->
class CurriedFunc:
    def __init__(self, func, args = (), kwArgs = {}, unique = True, minArgs = None):
        self.__func = func
        self.__myArgs = args
        self.__myKwArgs = kwArgs
        self.__unique = unique
        self.__minArgs = minArgs

    def __call__(self, *args, **kwArgs):
        if args or kwArgs:                  # some more args!
            # Allocates data to assign to the next CurriedFunc.
            newArgs = self.__myArgs + args
            newKwArgs = dict.copy(self.__myKwArgs)

            # If unique is True, we don't want repeated keyword arguments.
            if self.__unique and not kwArgs.keys().isdisjoint(newKwArgs):
                raise ValueError("Repeated kw arg while unique = True")

            # Adds/updates keyword arguments.
            newKwArgs.update(kwArgs)

            # Checks whether it's time to evaluate func.
            if self.__minArgs is not None and self.__minArgs <= len(newArgs) + len(newKwArgs):
                return self.__func(*newArgs, **newKwArgs)       # time to evaluate func
            else:
                return CurriedFunc(self.__func, newArgs, newKwArgs, self.__unique, self.__minArgs)
        else:                               # the evaluation was forced
            return self.__func(*self.__myArgs, **self.__myKwArgs)

    def __rrshift__(self, arg):
        return self.__func(*(self.__myArgs + (arg,)), **self.__myKwArgs)      # forces evaluation

    def __sub__(self, other):
        if not isinstance(other, CurriedFunc):
            raise TypeError("Cannot compose a CurriedFunc with another type")

        def compFunc(*args, **kwArgs):
            return other.__func(*(other.__myArgs + (self.__func(*args, **kwArgs),)),
                                **other.__myKwArgs)

        return CurriedFunc(compFunc, self.__myArgs, self.__myKwArgs,
                           self.__unique, self.__minArgs)

def cur(f, minArgs = None):
    return CurriedFunc(f, (), {}, True, minArgs)

def curr(f, minArgs = None):
    return CurriedFunc(f, (), {}, False, minArgs)

# Simple Function.
def func(a, b, c, d, e, f, g = 100):
    print(a, b, c, d, e, f, g)

# NOTE: '<====' means "this line prints to the screen".

# Example 1.
f = cur(func)                   # f is a "curried" version of func
c1 = f(1)
c2 = c1(2, d = 4)               # Note that c is still unbound
c3 = c2(3)(f = 6)(e = 5)        # now c = 3
c3()                            # () forces the evaluation              <====
                                #   it prints "1 2 3 4 5 6 100"
c4 = c2(30)(f = 60)(e = 50)     # now c = 30
c4()                            # () forces the evaluation              <====
                                #   it prints "1 2 30 4 50 60 100"

print("\n------\n")

# Example 2.
f = curr(func)                  # f is a "curried" version of func
                                # curr = cur with possibly repeated
                                #   keyword args
c1 = f(1, 2)(3, 4)
c2 = c1(e = 5)(f = 6)(e = 10)() # ops... we repeated 'e' because we     <====
                                #   changed our mind about it!
                                #   again, () forces the evaluation
                                #   it prints "1 2 3 4 10 6 100"

print("\n------\n")

# Example 3.
f = cur(func, 6)        # forces the evaluation after 6 arguments
c1 = f(1, 2, 3)         # num args = 3
c2 = c1(4, f = 6)       # num args = 5
c3 = c2(5)              # num args = 6 ==> evalution                    <====
                        #   it prints "1 2 3 4 5 6 100"
c4 = c2(5, g = -1)      # num args = 7 ==> evaluation                   <====
                        #   we can specify more than 6 arguments, but
                        #   6 are enough to force the evaluation
                        #   it prints "1 2 3 4 5 6 -1"

print("\n------\n")

# Example 4.
def printTree(func, level = None):
    if level is None:
        printTree(cur(func), 0)
    elif level == 6:
        func(g = '')()      # or just func('')()
    else:
        printTree(func(0), level + 1)
        printTree(func(1), level + 1)

printTree(func)

print("\n------\n")

def f2(*args):
    print(", ".join(["%3d"%(x) for x in args]))

def stress(f, n):
    if n: stress(f(n), n - 1)
    else: f()               # enough is enough

stress(cur(f2), 100)

# Pipelining and Function Composition
print("\n--- Pipelining & Composition ---\n")

import sys
from urllib.request import urlopen
from re import findall
from math import sqrt, floor
from functools import reduce

map = cur(map, 2)
filter = cur(filter, 2)
urlopen = cur(urlopen)
findall = cur(findall)
my_print = cur(lambda list : print(*list))
reduce = cur(reduce, 2)

# Example 5

range(0,50) >> filter(lambda i : i%2) >> map(lambda i : i*i) >> my_print

print("---")

# Example 6

compFunc = filter(lambda i : i%2) - map(lambda i : i*i)
range(0,50) >> compFunc >> my_print

print("---")

# Example 7

# Tells whether x is not a proper multiple of n.
notPropMult = cur(lambda n, x : x <= n or x % n, 2)

def findPrimes(upTo):
    if (upTo <= 5): return [2, 3, 5]
    filterAll = (findPrimes(floor(sqrt(upTo)))
                 >> map(lambda x : filter(notPropMult(x)))
                 >> reduce(lambda f, g : f - g))
    return list(range(2, upTo + 1)) >> filterAll

findPrimes(1000) >> my_print

print("---")

# Example 8
# Finds the approximate number of hrefs in a web page.

def do(proc, arg):
   proc()
   return arg
do = cur(do)

cprint = cur(print)

("http://python.org"
 >> do(cprint("The page http://python.org has about... ", end = ''))
 >> do(sys.stdout.flush)
 >> urlopen
 >> cur(lambda x : x.read())
 >> findall(b"href=\"")
 >> cur(len)
 >> cur("{} hrefs.".format)
 >> cprint)
<---

Kiuhnm



More information about the Python-list mailing list