Infinite lists and generators

Luigi Ballabio ballabio at mac.com
Wed Nov 14 05:09:02 EST 2001


Hi all,
	I've been reading a few back issues of the Perl Journal, and I came across 
an article on implementing infinite lists in Perl. Just for fun I went 
ahead and reimplemented it in Python---I know someone probably did already, 
but as I said, it was for fun. Also, I've written a test suite which 
showcases a few tricks such as building the list of all prime numbers, of 
the Hamming numbers, and the power series of exp, sin and cos. I'm 
including the file below.

My question is: how does one go about pulling such stunts with generators 
instead?

Have fun,
		Luigi


P.S. Here's the file:

#------------- Stream.py --------------

"""
Infinite lists as explained in
Mark-Jason Dominus, "Infinite lists",
The Perl Journal Vol.2 No.3
http://www.samag.com/documents/s=1278/sam02030005/
"""

class Stream:
     "infinite linked list with memoization"
     def __init__(self,head,tail):
         """head must be an evaluated datum;
            tail must be either None or a promise to
            evaluate the rest of the stream, i.e.,
            a function which takes no arguments and
            returns the rest of the stream or None."""
         self.h = head
         self.t = tail
     def head(self):
         "returns the first element of the stream"
         return self.h
     def tail(self):
         "returns the rest of the stream"
         import types
         if type(self.t) == types.LambdaType:
             # evaluate the promise and memoize it
             self.t = self.t()
         return self.t

def integersFrom(n):
     "Infinite stream of integers starting from n"
     return Stream(n,lambda n=n:integersFrom(n+1))

def take(n,s):
     "returns a list of the first n elements of s"
     if n == 0:
         return []
     l = [s.head()]
     while s.tail() and n>1:
         s = s.tail()
         l.append(s.head())
         n = n-1
     return l

_old_map = map;
def map(f,*sn):
     """returns the stream of the f(x1,x2...) were the x are
        the elements of the input streams s1, s2..."""
     return Stream(f(*_old_map(Stream.head,sn)),
                   lambda f=f,sn=sn: map(f,*_old_map(Stream.tail,sn)))

def filter(f,s):
     """returns the stream of the elements x of the
        input stream s which satisfy the predicate f"""
     while s and not f(s.head()):
         s = s.tail()
     if not s:
         return None
     else:
         return Stream(s.head(),
                       lambda f=f,s=s: filter(f,s.tail()))

def iterate(f,x):
     "returns the stream of x, f(x), f(f(x)), f(f(f(x))), ..."
     return Stream(x,
                   lambda f=f,x=x: iterate(f,f(x)))

def merge(s1,s2,less = lambda x,y:x<y):
     """merges two streams s1 and s2 returning the stream of their
        elements in ascending order and without duplicates according
        to the given 'less' predicate"""
     if not s1:
         return s2
     elif not s2:
         return s1
     elif less(s1.head(),s2.head()):
         return Stream(s1.head(),
                       lambda s1=s1,s2=s2,less=less:
                              merge(s1.tail(),s2,less))
     elif less(s2.head(),s1.head()):
         return Stream(s2.head(),
                       lambda s1=s1,s2=s2,less=less:
                              merge(s1,s2.tail(),less))
     else:
         return Stream(s1.head(),
                       lambda s1=s1,s2=s2,less=less:
                              merge(s1.tail(),s2.tail(),less))


# Test suite (also used to show a few tricks)

import unittest

class StreamTest(unittest.TestCase):
     "Test suite for the Stream module"

     def _sieve(self,s):
         # sieve of Eratosthenes:
         # take the first element of s, remove from the stream
         # its multiples, and recursively do the same starting from
         # the element which is now second
         if s is None:
             return s
         else:
             h = s.head()
             return Stream(h,
                           # promise to evaluate the rest of the stream
                           lambda h=h,s=s,dummy=self:
                                  dummy._sieve(filter(lambda x,h=h: x%h!= 0,
                                                      s.tail())))
     def testPrimes(self):
         "Testing prime number stream"
         # build the stream of the prime numbers
         primes = self._sieve(integersFrom(2))
         # check that the first 10 are correct
         self.failIf(take(10,primes) != [2,3,5,7,11,13,17,19,23,29])

     def testHamming(self):
         "Testing Hamming number stream"
         # find the stream of the Hamming numbers, i.e., the
         # numbers that have no prime factors beside 2, 3, and 5.
         # The rationale of this function is that if n is a Hamming
         # number and is divisible by 2, then it must be the product
         # of 2 and another Hamming number (the same holds for 3 and 5).
         # Therefore, the stream of the Hamming numbers can be built
         # recursively by merging itself multiplied by 2, by 3 and by 5.

         # forward-define s so that it can be referred to
         # in the call to merge. The first Hamming number is 1.
         hammings = Stream(1,None)
         # now reassign the (promise of) rest of the stream to the tail
         hammings.t = lambda s=hammings: merge(map(lambda x:x*2, s),
                                               merge(map(lambda x:x*3, s),
                                                     map(lambda x:x*5, s)))
         self.failIf(take(20,hammings) !=
                     [1,2,3,4,5,6,8,9,10,12,15,16,18,20,24,25,27,30,32,36])

     # Example from SICP begins here
     # Here's a reason why it is called the Wizard book
     # We're going to create power series out of thin air...

     def _integrate_series(self,s):
         # given a power series a0 + a1 x + a2 x^2 + ...,
         # the power series of its integral is
         # c + a0 x + (1/2)a1 x^2 + (1/3)a2 x^3 + ...
         return map(lambda x,y:x/y,s,integersFrom(1))

     def testSeries(self):
         "Testing power series"
         import math
         # 1) the exponential function coincides with its integral, i.e.,
         #    they have the same series. The first element is 1.
         exp_series = Stream(1.0,None)
         exp_series.t = lambda s=exp_series, dummy=self: \
                               dummy._integrate_series(s)

         # 2) the integral of the cosine is the sine; the integral of
         #    the sine is -cosine.
         sin_series = Stream(0.0,None)
         cos_series = Stream(1.0,None)
         sin_series.t = lambda s=cos_series, dummy=self: \
                               dummy._integrate_series(s)
         cos_series.t = lambda s=sin_series, dummy=self: \
                               map(lambda x:-x,
                                   dummy._integrate_series(s))

         # test that we can calculate the functions by using power series
         for x in [0.5, 1.0, 2.0]:
             # get the stream of 1, x, x^2, x^3, ...
             powers_of_x = iterate(lambda y,c=x:y*c, 1.0)
             # multiply the series by the corresponding powers of x
             exp_terms = map(lambda x,y:x*y, exp_series, powers_of_x)
             sin_terms = map(lambda x,y:x*y, sin_series, powers_of_x)
             cos_terms = map(lambda x,y:x*y, cos_series, powers_of_x)
             # sum the first 15 terms and compare with f(x)
             exp_x = reduce(lambda x,y:x+y, take(15,exp_terms))
             sin_x = reduce(lambda x,y:x+y, take(15,sin_terms))
             cos_x = reduce(lambda x,y:x+y, take(15,cos_terms))
             self.failIf(math.fabs(exp_x-math.exp(x)) > 1e-7)
             self.failIf(math.fabs(sin_x-math.sin(x)) > 1e-7)
             self.failIf(math.fabs(cos_x-math.cos(x)) > 1e-7)


def test():
     "Run the module test suite"
     import sys
     suite = unittest.TestSuite()
     suite.addTest(unittest.makeSuite(StreamTest,'test'))
     if sys.hexversion >= 0x020100f0:
         unittest.TextTestRunner(verbosity=2).run(suite)
     else:
         unittest.TextTestRunner().run(suite)

if __name__ == '__main__':
     test()





More information about the Python-list mailing list