[pypy-svn] r39999 - in pypy/dist/pypy/lib: . app_test
afayolle at codespeak.net
afayolle at codespeak.net
Tue Mar 6 20:48:55 CET 2007
Author: afayolle
Date: Tue Mar 6 20:48:54 2007
New Revision: 39999
Modified:
pypy/dist/pypy/lib/aop.py
pypy/dist/pypy/lib/app_test/sample_aop_code.py
pypy/dist/pypy/lib/app_test/test_aop.py
Log:
before and after advices supported on execution pointcuts
Modified: pypy/dist/pypy/lib/aop.py
==============================================================================
--- pypy/dist/pypy/lib/aop.py (original)
+++ pypy/dist/pypy/lib/aop.py Tue Mar 6 20:48:54 2007
@@ -17,12 +17,25 @@
self.offset = 0
def default(self, node):
print ' '*self.offset+str(node)
- self.offset += 2
+ self.offset += 4
for child in node.getChildNodes():
child.accept(self)
- self.offset -= 2
+ self.offset -= 4
return node
+ def visitFunction(self, func):
+ print " "*self.offset+'def %s:' % (func.name)
+ self.offset += 4
+ for node in func.code.nodes:
+ node.accept(self)
+ self.offset -= 4
+ def visitReturn(self, ret):
+ print ' '*self.offset+'return ',
+ for child in ret.getChildNodes():
+ child.accept(self)
+
+
+
DEBUGGER= Debug()
class Advice(parser.ASTVisitor):
@@ -46,8 +59,8 @@
def visitFunction(self, node):
if self.pointcut.match(node):
- self.weave_at_pointcut(node,
- self.pointcut.joinpoint(node))
+ node = self.weave_at_pointcut(node,
+ self.pointcut.joinpoint(node))
return node
def vistClass(self, node):
@@ -56,17 +69,23 @@
return node
-def make_aop_call(id):
+def make_aop_call(id, targetname=None, discard=True):
"""return an AST for a call to a woven function
id is the integer returned when the advice was stored in the registry"""
p = parser
- return p.ASTDiscard(p.ASTCallFunc(p.ASTName('__aop__'),
- [p.ASTConst(id),], # arguments
+ arguments = [p.ASTConst(id),]
+ if targetname is not None:
+ arguments.append(p.ASTName(targetname))
+ if discard:
+ returnclass = p.ASTDiscard
+ else:
+ returnclass = p.ASTReturn
+ return returnclass(p.ASTCallFunc(p.ASTName('__aop__'),
+ arguments,
None, # *args
None # *kwargs
)
)
-
def is_aop_call(node):
p = parser
@@ -78,21 +97,39 @@
"""specify code to be run instead of the pointcut"""
def weave_at_pointcut(self, node, tjp):
print "WEAVE around!!!"
- pass # XXX WRITEME
-
+ p = parser
+ print 'original func:'
+ node.accept(DEBUGGER)
+ print '+++', self.function
+ id = __aop__.register_joinpoint(self.function, tjp)
+ statement = node.code
+ newname = '__aoptarget_%s_%s__' % (node.name, id)
+ newcode = p.ASTStmt([p.ASTFunction(node.decorators,
+ newname,
+ node.argnames,
+ node.defaults,
+ node.flags,
+ node.w_doc,
+ node.code,
+ node.lineno),
+ make_aop_call(id, targetname=newname, discard=False),
+ ])
+
+ node.decorators = None
+ node.code = newcode
+ print 'weaving produced:'
+ node.accept(DEBUGGER)
+ return node
+
class before(Advice):
"""specify code to be run before the pointcut"""
def weave_at_pointcut(self, node, tjp):
print "WEAVE before!!!"
id = __aop__.register_joinpoint(self.function, tjp)
statement_list = node.code.nodes
- for idx, stmt in enumerate(statement_list):
- if is_aop_call(stmt):
- continue
- else:
- break
- statement_list.insert(idx, make_aop_call(id))
+ statement_list.insert(0, make_aop_call(id))
node.code.nodes = statement_list
+ return node
@@ -100,7 +137,11 @@
"""specify code to be run after the pointcut"""
def weave_at_pointcut(self, node, tjp):
print "WEAVE after!!!"
- pass # XXX WRITEME
+ id = __aop__.register_joinpoint(self.function, tjp)
+ statement = node.code
+ tryfinally = parser.ASTTryFinally(statement, make_aop_call(id))
+ node.code = tryfinally
+ return node
class introduce(Advice):
"""insert new code in the pointcut
@@ -109,6 +150,7 @@
def weave_at_pointcut(self, node, tjp):
print "WEAVE introduce!!!"
pass # XXX WRITEME
+ return node
@@ -197,38 +239,46 @@
"""return a dynamic pointcut representing places where the pointcut is called"""
# XXX may be difficult to implement ?
self.isdynamic = True
+ self.mode = 'call'
#raise NotImplementedError('call')
return self
def execution(self):
"""return a dynamic pointcut representing places where the pointcut is executed"""
self.isdynamic = True
+ self.mode = 'execution'
return self
def initialization(self):
"""return a dynamic pointcut representing places where the pointcut is instantiated"""
self.isdynamic = True
+ self.mode = 'initializartion'
#raise NotImplementedError('initialization')
return self
def destruction(self):
"""return a dynamic pointcut representing places where the pointcut is destroyed"""
self.isdynamic = True
+ self.mode = 'destruction'
#raise NotImplementedError('destruction')
return self
def match(self, astnode):
# FIXME !!! :-)
- try:
- return astnode.name == self.pointcutdef
- except AttributeError:
- return False
+ if self.mode == 'execution':
+ try:
+ return astnode.name == self.pointcutdef
+ except AttributeError:
+ return False
def joinpoint(self, node):
"""returns a join point instance for the node"""
assert self.match(node)
- return JoinPoint()
+ if self.mode == 'execution':
+## stnode = parser.STType(node)
+## target = getattr(stnode.compile(), node.name)
+ return JoinPoint()
### make these class methods of PointCut ?
def within(pointcutstring):
@@ -272,6 +322,10 @@
self._curr_aspect = None
return ast
+ def _clear_all(self):
+ self.advices = []
+ self.joinpoints = {}
+
def _next_id(self):
try:
return self._id
@@ -281,12 +335,15 @@
def register_joinpoint(self, callable, *args):
assert self._curr_aspect is not None
id = self._next_id()
+ print "register joinpoint with id %d" % id
args = (self._curr_aspect,) + args
self.joinpoints[id] = callable, args
return id
- def __call__(self, id):
+ def __call__(self, id, target=None):
callable, args = self.joinpoints[id]
+ print args
+ args[-1].target = target
callable(*args)
import __builtin__
Modified: pypy/dist/pypy/lib/app_test/sample_aop_code.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/sample_aop_code.py (original)
+++ pypy/dist/pypy/lib/app_test/sample_aop_code.py Tue Mar 6 20:48:54 2007
@@ -1,8 +1,33 @@
+code = """
def foo(b,c):
+ print 'foo'
a = 2
d = bar()
return b+a+c+d
def bar():
+ print 'bar'
return 42
+
+def baz(b,c):
+ try:
+ return foo(b,c)
+ finally:
+ bar()
+"""
+import os
+import os.path as osp
+
+def _make_filename(name):
+ if not name.endswith('.py'):
+ name += ".py"
+ return osp.join(osp.dirname(__file__), name)
+
+def write_module(name):
+ f = open(_make_filename(name), 'w')
+ f.write(code)
+ f.close()
+
+def clean_module(name):
+ os.unlink(_make_filename(name))
Modified: pypy/dist/pypy/lib/app_test/test_aop.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/test_aop.py (original)
+++ pypy/dist/pypy/lib/app_test/test_aop.py Tue Mar 6 20:48:54 2007
@@ -1,13 +1,14 @@
from pypy.conftest import gettestobjspace
+
class AppTestAop(object):
def setup_class(cls):
cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
- def test_init(self):
+ def _test_init(self):
import aop
- def test_static_dynamic_advice_and_pointcut(self):
+ def _test_static_dynamic_advice_and_pointcut(self):
from aop import PointCut, introduce, before, around, after
dyn_pc = PointCut('foo').call()
@@ -27,7 +28,7 @@
adv = advice(dyn_pc)
assert adv is not None
- def test_is_aop(self):
+ def _test_is_aop(self):
from aop import is_aop_call
import parser
func = """
@@ -40,24 +41,80 @@
result = [is_aop_call(n) for n in funcast.code.nodes]
assert result == [True, False, True]
- def test_simple_aspect(self):
+ def _test_simple_aspect_before_execution(self):
from aop import PointCut, Aspect, before
+ from app_test import sample_aop_code
+ __aop__._clear_all()
+ sample_aop_code.write_module('aop_before_execution')
class AspectTest:
__metaclass__ = Aspect
def __init__(self):
self.executed = False
@before(PointCut('foo').execution())
- def advice_before_excecution(self, tjp):
+ def advice_before_execution(self, tjp):
self.executed = True
assert __aop__.advices == []
aspect = AspectTest()
- assert __aop__.advices == [(aspect, AspectTest.advice_before_excecution)]
+ assert __aop__.advices == [(aspect, AspectTest.advice_before_execution)]
assert not aspect.executed
+ from app_test import aop_before_execution
+ assert aspect.executed == 0
+ aop_before_execution.foo(1,2)
+ assert aspect.executed == 1
+ sample_aop_code.clean_module('aop_before_execution')
+
+ def _test_simple_aspect_after_execution(self):
+ from aop import PointCut, Aspect, after
from app_test import sample_aop_code
+ __aop__._clear_all()
+ sample_aop_code.write_module('aop_after_execution')
+ class AspectTest:
+ __metaclass__ = Aspect
+ def __init__(self):
+ self.executed = 0
+ @after(PointCut('foo').execution())
+ def advice_after_execution(self, tjp):
+ self.executed += 1
+
+ assert __aop__.advices == []
+ aspect = AspectTest()
+ assert __aop__.advices == [(aspect, AspectTest.advice_after_execution)]
assert not aspect.executed
- sample_aop_code.foo(1,2)
- assert aspect.executed
+ from app_test import aop_after_execution
+ assert aspect.executed == 0
+ aop_after_execution.foo(1,2)
+ assert aspect.executed == 1
+ sample_aop_code.clean_module('aop_after_execution')
+ def test_simple_aspect_around_execution(self):
+ from aop import PointCut, Aspect, around
+ from app_test import sample_aop_code
+ __aop__._clear_all()
+ sample_aop_code.write_module('aop_around_execution')
+ class AspectTest:
+ __metaclass__ = Aspect
+ def __init__(self):
+ self.executed_before = 0
+ self.executed_after = 0
+ @around(PointCut('foo').execution())
+ def advice_around_execution(self, tjp):
+ print '>>>in'
+ self.executed_before += 1
+ tjp.proceed()
+ self.executed_after += 1
+ self.result = tjp.result()
+ print '<<<out'
+
+ aspect = AspectTest()
+ from app_test import aop_around_execution
+ assert aspect.executed_before == 0
+ assert aspect.executed_after == 0
+ aop_around_execution.foo(1,2)
+ assert aspect.executed_before == 1
+ assert aspect.executed_after == 1
+ assert aspect.result == 47
+ sample_aop_code.clean_module('aop_around_execution')
+
More information about the Pypy-commit
mailing list