[pypy-svn] r56483 - in pypy/dist/pypy/module/itertools: . test

adurdin at codespeak.net adurdin at codespeak.net
Sat Jul 12 11:54:50 CEST 2008


Author: adurdin
Date: Sat Jul 12 11:54:50 2008
New Revision: 56483

Modified:
   pypy/dist/pypy/module/itertools/__init__.py
   pypy/dist/pypy/module/itertools/interp_itertools.py
   pypy/dist/pypy/module/itertools/test/test_itertools.py
Log:
(adurdin, jlg)
- interp_itertools: groupby implemented, but not yet correct
- interp_itertools tests on exception string only look for the important data


Modified: pypy/dist/pypy/module/itertools/__init__.py
==============================================================================
--- pypy/dist/pypy/module/itertools/__init__.py	(original)
+++ pypy/dist/pypy/module/itertools/__init__.py	Sat Jul 12 11:54:50 2008
@@ -29,6 +29,7 @@
         'count'         : 'interp_itertools.W_Count',
         'cycle'         : 'interp_itertools.W_Cycle',
         'dropwhile'     : 'interp_itertools.W_DropWhile',
+        'groupby'       : 'interp_itertools.W_GroupBy',
         'ifilter'       : 'interp_itertools.W_IFilter',
         'ifilterfalse'  : 'interp_itertools.W_IFilterFalse',
         'imap'          : 'interp_itertools.W_IMap',

Modified: pypy/dist/pypy/module/itertools/interp_itertools.py
==============================================================================
--- pypy/dist/pypy/module/itertools/interp_itertools.py	(original)
+++ pypy/dist/pypy/module/itertools/interp_itertools.py	Sat Jul 12 11:54:50 2008
@@ -718,3 +718,142 @@
         __iter__ = interp2app(W_TeeIterable.iter_w, unwrap_spec=['self']),
         next     = interp2app(W_TeeIterable.next_w, unwrap_spec=['self']))
 W_TeeIterable.typedef.acceptable_as_base_class = False
+
+
+class W_GroupBy(Wrappable):
+
+    def __init__(self, space, w_iterable, w_fun):
+        self.space = space
+        self.w_iterable = self.space.iter(w_iterable)
+        self.identity_fun = self.space.is_w(w_fun, self.space.w_None)
+        self.w_fun = w_fun
+        self.index = 0
+        self.lookahead = False
+        self.exhausted = False
+        self.started = False
+        self.group_edge = True
+        self.w_lookahead = self.space.w_None
+        self.w_key = self.space.w_None
+
+    def iter_w(self):
+        return self.space.wrap(self)
+
+    def next_w(self):
+        if self.exhausted:
+            raise OperationError(self.space.w_StopIteration, self.space.w_None)
+
+        if not self.started:
+            self.started = True
+            try:
+                w_obj = self.space.next(self.w_iterable)
+            except OperationError, e:
+                if e.match(self.space, self.space.w_StopIteration):
+                    self.exhausted = True
+                raise
+            else:
+                self.w_lookahead = w_obj
+                if self.identity_fun:
+                    self.w_key = w_obj
+                else:
+                    self.w_key = self.space.call_function(self.w_fun, w_obj)
+                self.lookahead = True
+
+        if not self.group_edge:
+            # Consume unwanted input until we reach the next group
+            try:
+                while True:
+                    self.group_next(self.index)
+            except StopIteration:
+                pass
+            if self.exhausted:
+                raise OperationError(self.space.w_StopIteration, self.space.w_None)
+        w_iterator = self.space.wrap(W_GroupByIterator(self.space, self.index, self))
+        return self.space.newtuple([self.w_key, w_iterator])
+
+    def group_next(self, group_index):
+        self.group_edge = False
+        if group_index < self.index:
+            raise StopIteration
+        else:
+            if self.lookahead:
+                self.lookahead = False
+                return self.w_lookahead
+
+            try:
+                w_obj = self.space.next(self.w_iterable)
+            except OperationError, e:
+                if e.match(self.space, self.space.w_StopIteration):
+                    self.exhausted = True
+                    raise StopIteration
+                else:
+                    raise
+            else:
+                if self.identity_fun:
+                    w_new_key = w_obj
+                else:
+                    w_new_key = self.space.call_function(self.w_fun, w_obj)
+                if self.space.eq_w(self.w_key, w_new_key):
+                    return w_obj
+                else:
+                    self.index += 1
+                    self.w_lookahead = w_obj
+                    self.w_key = w_new_key
+                    self.lookahead = True
+                    self.group_edge = True
+                    raise StopIteration
+
+def W_GroupBy___new__(space, w_subtype, w_iterable, w_fun=None):
+    return space.wrap(W_GroupBy(space, w_iterable, w_fun))
+
+W_GroupBy.typedef = TypeDef(
+        'groupby',
+        __new__  = interp2app(W_GroupBy___new__, unwrap_spec=[ObjSpace, W_Root, W_Root, W_Root]),
+        __iter__ = interp2app(W_GroupBy.iter_w, unwrap_spec=['self']),
+        next     = interp2app(W_GroupBy.next_w, unwrap_spec=['self']),
+        __doc__  = """Make an iterator that returns consecutive keys and groups from the
+    iterable. The key is a function computing a key value for each
+    element. If not specified or is None, key defaults to an identity
+    function and returns the element unchanged. Generally, the
+    iterable needs to already be sorted on the same key function.
+
+    The returned group is itself an iterator that shares the
+    underlying iterable with groupby(). Because the source is shared,
+    when the groupby object is advanced, the previous group is no
+    longer visible. So, if that data is needed later, it should be
+    stored as a list:
+
+       groups = []
+       uniquekeys = []
+       for k, g in groupby(data, keyfunc):
+           groups.append(list(g))      # Store group iterator as a list
+           uniquekeys.append(k)
+    """)
+W_GroupBy.typedef.acceptable_as_base_class = False
+
+class W_GroupByIterator(Wrappable):
+    def __init__(self, space, index, groupby):
+        self.space = space
+        self.index = index
+        self.groupby = groupby
+        self.exhausted = False
+
+    def iter_w(self):
+        return self.space.wrap(self)
+
+    def next_w(self):
+        if self.exhausted:
+            raise OperationError(self.space.w_StopIteration, self.space.w_None)
+
+        try:
+            w_obj = self.groupby.group_next(self.index)
+        except StopIteration:
+            self.exhausted = True
+            raise OperationError(self.space.w_StopIteration, self.space.w_None)
+        else:
+            return w_obj
+
+W_GroupByIterator.typedef = TypeDef(
+        '_groupby',
+        __iter__ = interp2app(W_GroupByIterator.iter_w, unwrap_spec=['self']),
+        next     = interp2app(W_GroupByIterator.next_w, unwrap_spec=['self']))
+W_GroupByIterator.typedef.acceptable_as_base_class = False

Modified: pypy/dist/pypy/module/itertools/test/test_itertools.py
==============================================================================
--- pypy/dist/pypy/module/itertools/test/test_itertools.py	(original)
+++ pypy/dist/pypy/module/itertools/test/test_itertools.py	Sat Jul 12 11:54:50 2008
@@ -245,17 +245,18 @@
         raises(StopIteration, it.next)
 
     def test_chain_wrongargs(self):
-        import itertools
+        import itertools, re
         
         raises(TypeError, itertools.chain, None)
         raises(TypeError, itertools.chain, [], None)
-        
+
+        # The error message should indicate which argument was dodgy
         for x in range(10):
             args = [()] * x + [None] + [()] * (9 - x)
             try:
                 itertools.chain(*args)
             except TypeError, e:
-                assert str(e) == "chain argument #%d must support iteration" % (x + 1)
+                assert re.search(r'\b%d\b' % (x + 1), str(e))
             else:
                 fail("TypeError expected")
 
@@ -332,17 +333,18 @@
         assert it1.next() == 5
 
     def test_izip_wrongargs(self):
-        import itertools
+        import itertools, re
         
         # Duplicate python 2.4 behaviour for invalid arguments
         raises(TypeError, itertools.izip, None, 0)
 
+        # The error message should indicate which argument was dodgy
         for x in range(10):
             args = [()] * x + [None] + [()] * (9 - x)
             try:
                 itertools.izip(*args)
             except TypeError, e:
-                assert str(e) == "izip argument #%d must support iteration" % (x + 1)
+                assert re.search(r'\b%d\b' % (x + 1), str(e))
             else:
                 fail("TypeError expected")
 
@@ -403,13 +405,79 @@
                 assert it.next() == x
             raises(StopIteration, it.next)
 
-    def test_iterables_wrongargs(self):
+    def test_tee_wrongargs(self):
         import itertools
         
         raises(TypeError, itertools.tee, 0)
         raises(ValueError, itertools.tee, [], -1)
         raises(TypeError, itertools.tee, [], None)
 
+    def test_groupby(self):
+        import itertools
+        
+        it = itertools.groupby([])
+        raises(StopIteration, it.next)
+
+        it = itertools.groupby([1, 2, 2, 3, 3, 3, 4, 4, 4, 4])
+        for x in [1, 2, 3, 4]:
+            k, g = it.next()
+            assert k == x
+            assert len(list(g)) == x
+            raises(StopIteration, g.next)
+        raises(StopIteration, it.next)
+
+        it = itertools.groupby([0, 1, 2, 3, 4, 5], None)
+        for x in [0, 1, 2, 3, 4, 5]:
+            k, g = it.next()
+            assert k == x
+            assert g.next() == x
+            raises(StopIteration, g.next)
+        raises(StopIteration, it.next)
+
+        it = itertools.groupby([0, 0, 0, 0, 1])
+        k1, g1 = it.next()
+        k2, g2 = it.next()
+        raises(StopIteration, g1.next)
+        assert g2.next() == 1
+        raises(StopIteration, g2.next)
+
+        def half_floor(x):
+            return x // 2
+        it = itertools.groupby([0, 1, 2, 3, 4, 5], half_floor)
+        for x in [0, 1, 2]:
+            k, g = it.next()
+            assert k == x
+            assert half_floor(g.next()) == x
+            assert half_floor(g.next()) == x
+            raises(StopIteration, g.next)
+        raises(StopIteration, it.next)
+
+        # Grouping is not based on key identity
+        class NeverEqual(object):
+            def __eq__(self, other):
+                return False
+        objects = [NeverEqual(), NeverEqual(), NeverEqual()]
+        it = itertools.groupby(objects)
+        for x in objects:
+            print "Trying", x
+            k, g = it.next()
+            assert k is x
+            assert g.next() is x
+            raises(StopIteration, g.next)
+        raises(StopIteration, it.next)
+        
+        # Grouping is based on key equality
+        class AlwaysEqual(object):
+            def __eq__(self, other):
+                return True
+        objects = [AlwaysEqual(), AlwaysEqual(), AlwaysEqual()]
+        it = itertools.groupby(objects)
+        k, g = it.next()
+        assert k is objects[0]
+        for x in objects:
+            assert g.next() is x
+        raises(StopIteration, g.next)
+        raises(StopIteration, it.next)
 
     def test_iterables(self):
         import itertools
@@ -419,6 +487,7 @@
             itertools.count(),
             itertools.cycle([]),
             itertools.dropwhile(bool, []),
+            itertools.groupby([]),
             itertools.ifilter(None, []),
             itertools.ifilterfalse(None, []),
             itertools.imap(None),
@@ -446,6 +515,7 @@
             itertools.count,
             itertools.cycle,
             itertools.dropwhile,
+            itertools.groupby,
             itertools.ifilter,
             itertools.ifilterfalse,
             itertools.imap,
@@ -467,6 +537,7 @@
             itertools.count,
             itertools.cycle,
             itertools.dropwhile,
+            itertools.groupby,
             itertools.ifilter,
             itertools.ifilterfalse,
             itertools.imap,



More information about the Pypy-commit mailing list