[pypy-svn] r39999 - in pypy/dist/pypy/lib: . app_test

afayolle at codespeak.net afayolle at codespeak.net
Tue Mar 6 20:48:55 CET 2007


Author: afayolle
Date: Tue Mar  6 20:48:54 2007
New Revision: 39999

Modified:
   pypy/dist/pypy/lib/aop.py
   pypy/dist/pypy/lib/app_test/sample_aop_code.py
   pypy/dist/pypy/lib/app_test/test_aop.py
Log:
before and after advices supported on execution pointcuts

Modified: pypy/dist/pypy/lib/aop.py
==============================================================================
--- pypy/dist/pypy/lib/aop.py	(original)
+++ pypy/dist/pypy/lib/aop.py	Tue Mar  6 20:48:54 2007
@@ -17,12 +17,25 @@
         self.offset = 0
     def default(self, node):
         print ' '*self.offset+str(node)
-        self.offset += 2
+        self.offset += 4
         for child in node.getChildNodes():
             child.accept(self)
-        self.offset -= 2
+        self.offset -= 4
         return node
 
+    def visitFunction(self, func):
+        print " "*self.offset+'def %s:' % (func.name)
+        self.offset += 4
+        for node in func.code.nodes:
+            node.accept(self)
+        self.offset -= 4
+    def visitReturn(self, ret):
+        print ' '*self.offset+'return ',
+        for child in ret.getChildNodes():
+            child.accept(self)
+
+    
+
 DEBUGGER= Debug()
 
 class Advice(parser.ASTVisitor):
@@ -46,8 +59,8 @@
 
     def visitFunction(self, node):
         if self.pointcut.match(node):
-            self.weave_at_pointcut(node,
-                                   self.pointcut.joinpoint(node))
+            node = self.weave_at_pointcut(node,
+                                          self.pointcut.joinpoint(node))
         return node
 
     def vistClass(self, node):
@@ -56,17 +69,23 @@
         return node
         
 
-def make_aop_call(id):
+def make_aop_call(id, targetname=None, discard=True):
     """return an AST for a call to a woven function
     id is the integer returned when the advice was stored in the registry"""
     p = parser
-    return p.ASTDiscard(p.ASTCallFunc(p.ASTName('__aop__'),
-                                      [p.ASTConst(id),], # arguments
+    arguments = [p.ASTConst(id),]
+    if targetname is not None:
+        arguments.append(p.ASTName(targetname))
+    if discard:
+        returnclass = p.ASTDiscard
+    else:
+        returnclass = p.ASTReturn
+    return returnclass(p.ASTCallFunc(p.ASTName('__aop__'),
+                                      arguments,
                                       None, # *args
                                       None # *kwargs
                                       )
                         )
-                     
 
 def is_aop_call(node):
     p = parser
@@ -78,21 +97,39 @@
     """specify code to be run instead of the pointcut"""
     def weave_at_pointcut(self, node, tjp):
         print "WEAVE around!!!"
-        pass # XXX WRITEME
-
+        p = parser
+        print 'original func:'
+        node.accept(DEBUGGER)
+        print '+++', self.function
+        id = __aop__.register_joinpoint(self.function, tjp)
+        statement = node.code
+        newname = '__aoptarget_%s_%s__' % (node.name, id)
+        newcode = p.ASTStmt([p.ASTFunction(node.decorators,
+                                           newname,
+                                           node.argnames,
+                                           node.defaults,
+                                           node.flags,
+                                           node.w_doc,
+                                           node.code,
+                                           node.lineno),                             
+                             make_aop_call(id, targetname=newname, discard=False),
+                             ])
+        
+        node.decorators = None
+        node.code = newcode
+        print 'weaving produced:'
+        node.accept(DEBUGGER)
+        return node
+    
 class before(Advice):
     """specify code to be run before the pointcut"""
     def weave_at_pointcut(self, node, tjp):
         print "WEAVE before!!!"
         id = __aop__.register_joinpoint(self.function, tjp)
         statement_list = node.code.nodes
-        for idx, stmt in enumerate(statement_list):
-            if is_aop_call(stmt):
-                continue
-            else:
-                break
-        statement_list.insert(idx, make_aop_call(id))
+        statement_list.insert(0, make_aop_call(id))
         node.code.nodes = statement_list
+        return node
         
             
 
@@ -100,7 +137,11 @@
     """specify code to be run after the pointcut"""
     def weave_at_pointcut(self, node, tjp):
         print "WEAVE after!!!"
-        pass # XXX WRITEME
+        id = __aop__.register_joinpoint(self.function, tjp)
+        statement = node.code
+        tryfinally = parser.ASTTryFinally(statement, make_aop_call(id))
+        node.code = tryfinally
+        return node
 
 class introduce(Advice):
     """insert new code in the pointcut
@@ -109,6 +150,7 @@
     def weave_at_pointcut(self, node, tjp):
         print "WEAVE introduce!!!"
         pass # XXX WRITEME
+        return node
 
     
         
@@ -197,38 +239,46 @@
         """return a dynamic pointcut representing places where the pointcut is called"""
         # XXX may be difficult to implement ?
         self.isdynamic = True
+        self.mode = 'call'
         #raise NotImplementedError('call')
         return self
 
     def execution(self):
         """return a dynamic pointcut representing places where the pointcut is executed"""
         self.isdynamic = True
+        self.mode = 'execution'
         return self
 
     def initialization(self):
         """return a dynamic pointcut representing places where the pointcut is instantiated"""
         self.isdynamic = True
+        self.mode = 'initializartion'
         #raise NotImplementedError('initialization')
         return self
     
     def destruction(self):
         """return a dynamic pointcut representing places where the pointcut is destroyed"""
         self.isdynamic = True
+        self.mode = 'destruction'
         #raise NotImplementedError('destruction')
 
         return self
 
     def match(self, astnode):
         # FIXME !!! :-)
-        try:
-            return astnode.name == self.pointcutdef
-        except AttributeError:
-            return False
+        if self.mode == 'execution':
+            try:
+                return astnode.name == self.pointcutdef
+            except AttributeError:
+                return False
 
     def joinpoint(self, node):
         """returns a join point instance for the node"""
         assert self.match(node)
-        return JoinPoint()
+        if self.mode == 'execution':
+##             stnode = parser.STType(node)
+##             target = getattr(stnode.compile(), node.name)
+            return JoinPoint()
 
 ### make these class methods of PointCut ?
 def within(pointcutstring):
@@ -272,6 +322,10 @@
         self._curr_aspect = None
         return ast
 
+    def _clear_all(self):
+        self.advices = []
+        self.joinpoints = {}
+
     def _next_id(self):
         try:
             return self._id
@@ -281,12 +335,15 @@
     def register_joinpoint(self, callable, *args):
         assert self._curr_aspect is not None
         id = self._next_id()
+        print "register joinpoint with id %d" % id
         args = (self._curr_aspect,) + args
         self.joinpoints[id] = callable, args
         return id
 
-    def __call__(self, id):
+    def __call__(self, id, target=None):
         callable, args = self.joinpoints[id]
+        print args
+        args[-1].target = target
         callable(*args)
 
 import __builtin__

Modified: pypy/dist/pypy/lib/app_test/sample_aop_code.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/sample_aop_code.py	(original)
+++ pypy/dist/pypy/lib/app_test/sample_aop_code.py	Tue Mar  6 20:48:54 2007
@@ -1,8 +1,33 @@
+code = """
 def foo(b,c):
+    print 'foo'
     a = 2
     d = bar()
     return b+a+c+d
 
 
 def bar():
+    print 'bar'
     return 42
+
+def baz(b,c):
+    try:
+        return foo(b,c)
+    finally:
+        bar()
+"""
+import os
+import os.path as osp
+
+def _make_filename(name):
+    if not name.endswith('.py'):
+        name += ".py"
+    return osp.join(osp.dirname(__file__), name)
+
+def write_module(name):
+    f = open(_make_filename(name), 'w')
+    f.write(code)
+    f.close()
+
+def clean_module(name):
+    os.unlink(_make_filename(name))

Modified: pypy/dist/pypy/lib/app_test/test_aop.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/test_aop.py	(original)
+++ pypy/dist/pypy/lib/app_test/test_aop.py	Tue Mar  6 20:48:54 2007
@@ -1,13 +1,14 @@
 from pypy.conftest import gettestobjspace
 
+
 class AppTestAop(object):
     def setup_class(cls):
         cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
 
-    def test_init(self):
+    def _test_init(self):
         import aop
 
-    def test_static_dynamic_advice_and_pointcut(self):
+    def _test_static_dynamic_advice_and_pointcut(self):
         from  aop import PointCut, introduce, before, around, after
 
         dyn_pc = PointCut('foo').call()
@@ -27,7 +28,7 @@
             adv = advice(dyn_pc)
             assert adv is not None
 
-    def test_is_aop(self):
+    def _test_is_aop(self):
         from aop import is_aop_call
         import parser
         func = """
@@ -40,24 +41,80 @@
         result = [is_aop_call(n) for n in funcast.code.nodes]
         assert result == [True, False, True]
 
-    def test_simple_aspect(self):
+    def _test_simple_aspect_before_execution(self):
         from  aop import PointCut, Aspect, before
+        from app_test import sample_aop_code
+        __aop__._clear_all()
+        sample_aop_code.write_module('aop_before_execution')
         
         class AspectTest:
             __metaclass__ = Aspect 
             def __init__(self):
                 self.executed = False
             @before(PointCut('foo').execution())
-            def advice_before_excecution(self, tjp):
+            def advice_before_execution(self, tjp):
                 self.executed = True
 
         assert __aop__.advices == []
         aspect = AspectTest()
-        assert __aop__.advices == [(aspect, AspectTest.advice_before_excecution)] 
+        assert __aop__.advices == [(aspect, AspectTest.advice_before_execution)] 
         assert not aspect.executed
 
+        from app_test import aop_before_execution
+        assert  aspect.executed == 0
+        aop_before_execution.foo(1,2)
+        assert aspect.executed == 1
+        sample_aop_code.clean_module('aop_before_execution')
+
+    def _test_simple_aspect_after_execution(self):
+        from  aop import PointCut, Aspect, after
         from app_test import sample_aop_code
+        __aop__._clear_all()
+        sample_aop_code.write_module('aop_after_execution')
+        class AspectTest:
+            __metaclass__ = Aspect 
+            def __init__(self):
+                self.executed = 0
+            @after(PointCut('foo').execution())
+            def advice_after_execution(self, tjp):
+                self.executed += 1
+
+        assert __aop__.advices == []
+        aspect = AspectTest()
+        assert __aop__.advices == [(aspect, AspectTest.advice_after_execution)] 
         assert not aspect.executed
-        sample_aop_code.foo(1,2)
-        assert aspect.executed
+        from app_test import aop_after_execution
+        assert aspect.executed == 0
+        aop_after_execution.foo(1,2)
+        assert aspect.executed == 1
+        sample_aop_code.clean_module('aop_after_execution')
 
+    def test_simple_aspect_around_execution(self):
+        from  aop import PointCut, Aspect, around
+        from app_test import sample_aop_code
+        __aop__._clear_all()
+        sample_aop_code.write_module('aop_around_execution')
+        class AspectTest:
+            __metaclass__ = Aspect 
+            def __init__(self):
+                self.executed_before = 0
+                self.executed_after = 0
+            @around(PointCut('foo').execution())
+            def advice_around_execution(self, tjp):
+                print '>>>in'
+                self.executed_before += 1
+                tjp.proceed()
+                self.executed_after += 1
+                self.result = tjp.result()
+                print '<<<out'
+        
+        aspect = AspectTest()
+        from app_test import aop_around_execution
+        assert aspect.executed_before == 0
+        assert aspect.executed_after == 0
+        aop_around_execution.foo(1,2)
+        assert aspect.executed_before == 1
+        assert aspect.executed_after == 1
+        assert aspect.result == 47
+        sample_aop_code.clean_module('aop_around_execution')
+        



More information about the Pypy-commit mailing list