[pypy-svn] r16174 - in pypy/dist/pypy/module/thread: . rpython rpython/test test
arigo at codespeak.net
arigo at codespeak.net
Fri Aug 19 19:20:34 CEST 2005
Author: arigo
Date: Fri Aug 19 19:20:21 2005
New Revision: 16174
Added:
pypy/dist/pypy/module/thread/ (props changed)
pypy/dist/pypy/module/thread/__init__.py (contents, props changed)
pypy/dist/pypy/module/thread/app_thread.py (contents, props changed)
pypy/dist/pypy/module/thread/gil.py (contents, props changed)
pypy/dist/pypy/module/thread/os_local.py (contents, props changed)
pypy/dist/pypy/module/thread/os_lock.py (contents, props changed)
pypy/dist/pypy/module/thread/os_thread.py (contents, props changed)
pypy/dist/pypy/module/thread/rpython/ (props changed)
pypy/dist/pypy/module/thread/rpython/__init__.py (contents, props changed)
pypy/dist/pypy/module/thread/rpython/exttable.py (contents, props changed)
pypy/dist/pypy/module/thread/rpython/ll_thread.py (contents, props changed)
pypy/dist/pypy/module/thread/rpython/test/ (props changed)
pypy/dist/pypy/module/thread/rpython/test/__init__.py (contents, props changed)
pypy/dist/pypy/module/thread/rpython/test/test_ll_thread.py (contents, props changed)
pypy/dist/pypy/module/thread/test/ (props changed)
pypy/dist/pypy/module/thread/test/__init__.py
pypy/dist/pypy/module/thread/test/support.py (contents, props changed)
pypy/dist/pypy/module/thread/test/test_local.py (contents, props changed)
pypy/dist/pypy/module/thread/test/test_lock.py (contents, props changed)
pypy/dist/pypy/module/thread/test/test_thread.py (contents, props changed)
pypy/dist/pypy/module/thread/threadlocals.py (contents, props changed)
Log:
The 'thread' module implemented with OS threads and a Global Interpreter Lock
(as in CPython).
Must be selected with --usemodules=thread.
Very occasionally, one test fails. Hard to figure out what's wrong :-(
The GIL itself is further separated from the rest of the code, which is about
using OS threads and locks. It's almost completely isolated in gil.py.
The 'thread._local' class is much nicer than in CPython: thanks to the
getdict() method on all w_ objects, we can do the "right thing" and have
getdict() return a dictionary that depends on the current thread. No need to
patch the __dict__ of the thread._local instance in __getattribute__,
__setattr__ and __delattr__.
Experimentally, the 'thread' module comes with its own rpython support in the
'rpython' submodule: it provides rpython support for interp-level
thread.start_new_thread(), thread.get_ident(), thread.allocate_lock() and lock
objects (the latter using the "external types" of the previous check-in).
Added: pypy/dist/pypy/module/thread/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/__init__.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,28 @@
+
+# Package initialisation
+from pypy.interpreter.mixedmodule import MixedModule
+
+class Module(MixedModule):
+ appleveldefs = {
+ 'exit': 'app_thread.exit',
+ 'exit_thread': 'app_thread.exit', # obsolete synonym
+ 'error': 'app_thread.error',
+ }
+
+ interpleveldefs = {
+ 'start_new_thread': 'os_thread.start_new_thread',
+ 'get_ident': 'os_thread.get_ident',
+ 'allocate_lock': 'os_lock.allocate_lock',
+ 'allocate': 'os_lock.allocate_lock', # obsolete synonym
+ 'LockType': 'os_lock.getlocktype(space)',
+ '_local': 'os_local.getlocaltype(space)',
+ }
+
+ def __init__(self, space, *args):
+ "NOT_RPYTHON: patches space.threadlocals to use real threadlocals"
+ from pypy.module.thread import gil
+ MixedModule.__init__(self, space, *args)
+ prev = space.threadlocals.getvalue()
+ space.threadlocals = gil.GILThreadLocals()
+ space.threadlocals.setvalue(prev)
+ space.threadlocals.enter_thread(space) # setup the main thread
Added: pypy/dist/pypy/module/thread/app_thread.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/app_thread.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,7 @@
+class error(Exception):
+ pass
+
+def exit():
+ """This is synonymous to ``raise SystemExit''. It will cause the current
+thread to exit silently unless the exception is caught."""
+ raise SystemExit
Added: pypy/dist/pypy/module/thread/gil.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/gil.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,35 @@
+"""
+Global Interpreter Lock.
+"""
+
+# This module adds a global lock to an object space.
+# If multiple threads try to execute simultaneously in this space,
+# all but one will be blocked. The other threads get a chance to run
+# from time to time, using the executioncontext's XXX
+
+import thread
+from pypy.module.thread.threadlocals import OSThreadLocals
+
+
+class GILThreadLocals(OSThreadLocals):
+ """A version of OSThreadLocals that enforces a GIL."""
+
+ def __init__(self):
+ self.GIL = thread.allocate_lock()
+
+ def enter_thread(self, space):
+ "Notification that the current thread is just starting: grab the GIL."
+ self.GIL.acquire(True)
+ OSThreadLocals.enter_thread(self, space)
+
+ def leave_thread(self, space):
+ "Notification that the current thread is stopping: release the GIL."
+ OSThreadLocals.leave_thread(self, space)
+ self.GIL.release()
+
+ def yield_thread(self):
+ """Notification that the current thread is between two bytecodes:
+ release the GIL for a little while."""
+ self.GIL.release()
+ # Other threads can run here
+ self.GIL.acquire(True)
Added: pypy/dist/pypy/module/thread/os_local.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/os_local.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,69 @@
+import thread
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.typedef import TypeDef, interp2app
+from pypy.interpreter.typedef import GetSetProperty, descr_get_dict
+from pypy.interpreter.typedef import descr_set_dict
+from pypy.interpreter.gateway import ObjSpace, W_Root, Arguments
+
+
+class Local(Wrappable):
+ """Thread-local data"""
+
+ def __init__(self, space, initargs):
+ self.space = space
+ self.initargs = initargs
+ ident = thread.get_ident()
+ self.dicts = {ident: space.newdict([])}
+
+ def getdict(self):
+ ident = thread.get_ident()
+ try:
+ w_dict = self.dicts[ident]
+ except KeyError:
+ # create a new dict for this thread
+ space = self.space
+ w_dict = self.dicts[ident] = space.newdict([])
+ # call __init__
+ try:
+ w_self = space.wrap(self)
+ w_type = self.getclass(space)
+ w_init = space.getattr(w_type, space.wrap("__init__"))
+ space.call_args(w_init, self.initargs.prepend(w_self))
+ except:
+ # failed, forget w_dict and propagate the exception
+ del self.dicts[ident]
+ raise
+ # ready
+ space.threadlocals.atthreadexit(space, finish_thread, self)
+ return w_dict
+
+ def setdict(self, space, w_dict):
+ if not space.is_true(space.isinstance(w_dict, space.w_dict)):
+ raise OperationError(space.w_TypeError,
+ space.wrap("setting dictionary to a non-dict"))
+ self.getdict() # force a dict to exist first
+ ident = thread.get_ident()
+ self.dicts[ident] = w_dict
+
+ def descr_local__new__(space, w_subtype, __args__):
+ # XXX check __args__
+ local = space.allocate_instance(Local, w_subtype)
+ Local.__init__(local, space, __args__)
+ return space.wrap(local)
+
+Local.typedef = TypeDef("thread._local",
+ __doc__ = "Thread-local data",
+ __new__ = interp2app(Local.descr_local__new__.im_func,
+ unwrap_spec=[ObjSpace, W_Root, Arguments]),
+ __dict__ = GetSetProperty(descr_get_dict,
+ descr_set_dict, cls=Local),
+ )
+
+def getlocaltype(space):
+ return space.gettypeobject(Local.typedef)
+
+
+def finish_thread(w_obj):
+ assert isinstance(w_obj, Local)
+ ident = thread.get_ident()
+ del w_obj.dicts[ident]
Added: pypy/dist/pypy/module/thread/os_lock.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/os_lock.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,104 @@
+"""
+Python locks, based on true threading locks provided by the OS.
+"""
+
+import thread
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.gateway import ObjSpace, interp2app
+from pypy.interpreter.typedef import TypeDef
+
+# Force the declaration of the type 'thread.LockType' for RPython
+import pypy.module.thread.rpython.exttable
+
+
+##import sys
+##def debug(msg, n):
+## return
+## tb = []
+## try:
+## for i in range(1, 8):
+## tb.append(sys._getframe(i).f_code.co_name)
+## except:
+## pass
+## tb = ' '.join(tb)
+## msg = '| %6d | %d %s | %s\n' % (thread.get_ident(), n, msg, tb)
+## sys.stderr.write(msg)
+
+
+class Lock(Wrappable):
+ "A wrappable box around an interp-level lock object."
+
+ def __init__(self):
+ self.lock = thread.allocate_lock()
+
+ def descr_lock_acquire(self, space, waitflag=1):
+ """Lock the lock. Without argument, this blocks if the lock is already
+locked (even by the same thread), waiting for another thread to release
+the lock, and return None once the lock is acquired.
+With an argument, this will only block if the argument is true,
+and the return value reflects whether the lock is acquired.
+The blocking operation is not interruptible."""
+ # XXX Usage of threadlocals.GIL in this function is considered hackish.
+ # Ideally, all GIL knowledge should be in gil.py.
+ mylock = self.lock
+ GIL = space.threadlocals.GIL
+ GIL.release()
+ result = mylock.acquire(bool(waitflag))
+ GIL.acquire(True)
+ return space.newbool(result)
+
+ def descr_lock_release(self, space):
+ """Release the lock, allowing another thread that is blocked waiting for
+the lock to acquire the lock. The lock must be in the locked state,
+but it needn't be locked by the same thread that unlocks it."""
+ try:
+ self.lock.release()
+ except thread.error:
+ w_module = space.getbuiltinmodule('thread')
+ w_error = space.getattr(w_module, space.wrap('error'))
+ raise OperationError(w_error, space.wrap("release unlocked lock"))
+
+ def descr_lock_locked(self, space):
+ """Return whether the lock is in the locked state."""
+ if self.lock.acquire(False):
+ self.lock.release()
+ return space.w_False
+ else:
+ return space.w_True
+
+
+descr_acquire = interp2app(Lock.descr_lock_acquire,
+ unwrap_spec=['self', ObjSpace, int])
+descr_release = interp2app(Lock.descr_lock_release,
+ unwrap_spec=['self', ObjSpace])
+descr_locked = interp2app(Lock.descr_lock_locked,
+ unwrap_spec=['self', ObjSpace])
+
+Lock.typedef = TypeDef("thread.lock",
+ __doc__ = """\
+A lock object is a synchronization primitive. To create a lock,
+call the thread.allocate_lock() function. Methods are:
+
+acquire() -- lock the lock, possibly blocking until it can be obtained
+release() -- unlock of the lock
+locked() -- test whether the lock is currently locked
+
+A lock is not owned by the thread that locked it; another thread may
+unlock it. A thread attempting to lock a lock that it has already locked
+will block until another thread unlocks it. Deadlocks may ensue.""",
+ acquire = descr_acquire,
+ release = descr_release,
+ locked = descr_locked,
+ # Obsolete synonyms
+ acquire_lock = descr_acquire,
+ release_lock = descr_release,
+ locked_lock = descr_locked,
+ )
+
+
+def allocate_lock(space):
+ return space.wrap(Lock())
+
+def getlocktype(space):
+ return space.gettypeobject(Lock.typedef)
Added: pypy/dist/pypy/module/thread/os_thread.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/os_thread.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,63 @@
+"""
+Thread support based on OS-level threads.
+"""
+
+import thread
+from pypy.interpreter.error import OperationError
+from pypy.interpreter.gateway import NoneNotWrapped
+from pypy.interpreter.gateway import ObjSpace, W_Root, Arguments
+
+# Force the declaration of thread.start_new_thread() & co. for RPython
+import pypy.module.thread.rpython.exttable
+
+
+class Bootstrapper:
+ def bootstrap(self):
+ space = self.space
+ w_callable = self.w_callable
+ args = self.args
+ space.threadlocals.enter_thread(space)
+ try:
+ try:
+ space.call_args(w_callable, args)
+ except OperationError, e:
+ if not e.match(space, space.w_SystemExit):
+ ident = thread.get_ident()
+ where = 'thread %d started by' % ident
+ e.write_unraisable(space, where, w_callable)
+ e.clear(space)
+ finally:
+ # clean up space.threadlocals to remove the ExecutionContext
+ # entry corresponding to the current thread
+ space.threadlocals.leave_thread(space)
+
+
+def start_new_thread(space, w_callable, w_args, w_kwargs=NoneNotWrapped):
+ """Start a new thread and return its identifier. The thread will call the
+function with positional arguments from the tuple args and keyword arguments
+taken from the optional dictionary kwargs. The thread exits when the
+function returns; the return value is ignored. The thread will also exit
+when the function raises an unhandled exception; a stack trace will be
+printed unless the exception is SystemExit."""
+ # XXX check that w_callable is callable
+ # XXX check that w_args is a tuple
+ # XXX check that w_kwargs is a dict
+ args = Arguments.frompacked(space, w_args, w_kwargs)
+ boot = Bootstrapper()
+ boot.space = space
+ boot.w_callable = w_callable
+ boot.args = args
+ ident = thread.start_new_thread(Bootstrapper.bootstrap, (boot,))
+ return space.wrap(ident)
+
+
+def get_ident(space):
+ """Return a non-zero integer that uniquely identifies the current thread
+amongst other threads that exist simultaneously.
+This may be used to identify per-thread resources.
+Even though on some platforms threads identities may appear to be
+allocated consecutive numbers starting at 1, this behavior should not
+be relied upon, and the number should be seen purely as a magic cookie.
+A thread's identity may be reused for another thread after it exits."""
+ ident = thread.get_ident()
+ return space.wrap(ident)
Added: pypy/dist/pypy/module/thread/rpython/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/rpython/__init__.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1 @@
+#
Added: pypy/dist/pypy/module/thread/rpython/exttable.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/rpython/exttable.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,24 @@
+"""
+Annotation support for interp-level lock objects.
+"""
+
+import thread
+from pypy.rpython.extfunctable import declare, declaretype
+
+module = 'pypy.module.thread.rpython.ll_thread'
+
+# ____________________________________________________________
+# The external type thread.LockType
+
+declaretype(thread.LockType,
+ "ThreadLock",
+ acquire = (bool, '%s/acquire_lock' % module),
+ release = (type(None), '%s/release_lock' % module),
+ )
+
+# ____________________________________________________________
+# Built-in functions needed in the rtyper
+
+declare(thread.start_new_thread, int, '%s/start_new_thread' % module)
+declare(thread.get_ident, int, '%s/get_ident' % module)
+declare(thread.allocate_lock, thread.LockType, '%s/allocate_lock' % module)
Added: pypy/dist/pypy/module/thread/rpython/ll_thread.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/rpython/ll_thread.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,37 @@
+"""
+Dummy low-level implementations for the external functions of the 'thread'
+module.
+"""
+
+import thread
+from pypy.rpython.module.support import from_rexternalobj, to_rexternalobj
+
+def ll_thread_start_new_thread(funcptr, argtuple):
+ # wrapper around ll_thread_start, to extract the single argument
+ # from the argtuple.
+ argument = argtuple.item0 # expects a single argument
+ return ll_thread_start(funcptr, argument)
+
+def ll_thread_start(funcptr, argument):
+ return thread.start_new_thread(funcptr, (argument,))
+ll_thread_start.suggested_primitive = True
+
+def ll_thread_get_ident():
+ return thread.get_ident()
+ll_thread_get_ident.suggested_primitive = True
+
+
+def ll_thread_allocate_lock():
+ lock = thread.allocate_lock()
+ return to_rexternalobj(lock)
+ll_thread_allocate_lock.suggested_primitive = True
+
+def ll_thread_acquire_lock(lockptr, waitflag):
+ lock = from_rexternalobj(lockptr)
+ return lock.acquire(waitflag)
+ll_thread_acquire_lock.suggested_primitive = True
+
+def ll_thread_release_lock(lockptr):
+ lock = from_rexternalobj(lockptr)
+ lock.release()
+ll_thread_release_lock.suggested_primitive = True
Added: pypy/dist/pypy/module/thread/rpython/test/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/rpython/test/__init__.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1 @@
+#
Added: pypy/dist/pypy/module/thread/rpython/test/test_ll_thread.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/rpython/test/test_ll_thread.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,25 @@
+import thread
+import pypy.module.thread.rpython.exttable # for declare()/declaretype()
+from pypy.module.thread.rpython.ll_thread import *
+from pypy.translator.annrpython import RPythonAnnotator
+from pypy.rpython.test.test_llinterp import interpret
+
+
+def test_annotate_lock():
+ def fn():
+ return thread.allocate_lock().acquire(False)
+ a = RPythonAnnotator()
+ s = a.build_types(fn, [])
+ # result should be a boolean
+ assert s.knowntype == bool
+
+def test_lock():
+ def fn():
+ l = thread.allocate_lock()
+ ok1 = l.acquire(True)
+ ok2 = l.acquire(False)
+ l.release()
+ ok3 = l.acquire(False)
+ return ok1 and not ok2 and ok3
+ res = interpret(fn, [])
+ assert res is True
Added: pypy/dist/pypy/module/thread/test/__init__.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/test/__init__.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1 @@
+#
Added: pypy/dist/pypy/module/thread/test/support.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/test/support.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,28 @@
+import py
+
+class GenericTestThread:
+
+ def setup_class(cls):
+ space = cls.space
+ if "thread" not in space.options.usemodules:
+ py.test.skip("--usemodules=thread option not provided")
+
+ cls.w_waitfor = space.appexec([], """():
+ import time
+ def waitfor(expr, timeout=10.0):
+ limit = time.time() + timeout
+ while time.time() <= limit:
+ time.sleep(0.005)
+ if expr():
+ return
+ print '*** timed out ***'
+ return waitfor
+ """)
+ cls.w_busywait = space.appexec([], """():
+ import time
+ def busywait(t):
+ limit = time.time() + t
+ while time.time() <= limit:
+ time.sleep(0.005)
+ return busywait
+ """)
Added: pypy/dist/pypy/module/thread/test/test_local.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/test/test_local.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,73 @@
+from pypy.module.thread.test.support import GenericTestThread
+
+
+class AppTestLocal(GenericTestThread):
+
+ def test_local(self):
+ import thread
+ from thread import _local as tlsobject
+ freed = []
+ class X:
+ def __del__(self):
+ freed.append(1)
+
+ ok = []
+ TLS1 = tlsobject()
+ TLS2 = tlsobject()
+ TLS1.aa = "hello"
+ def f(i):
+ success = False
+ try:
+ a = TLS1.aa = i
+ b = TLS1.bbb = X()
+ c = TLS2.cccc = i*3
+ d = TLS2.ddddd = X()
+ self.busywait(0.05)
+ assert TLS1.aa == a
+ assert TLS1.bbb is b
+ assert TLS2.cccc == c
+ assert TLS2.ddddd is d
+ success = True
+ finally:
+ ok.append(success)
+ for i in range(20):
+ thread.start_new_thread(f, (i,))
+ self.waitfor(lambda: len(ok) == 20, timeout=30.0)
+ assert ok == 20*[True] # see stdout/stderr for failures in the threads
+
+ self.waitfor(lambda: len(freed) >= 40)
+ assert len(freed) == 40
+ # in theory, all X objects should have been freed by now. Note that
+ # Python's own thread._local objects suffer from the very same "bug" that
+ # tls.py showed originally, and leaves len(freed)==38: the last thread's
+ # __dict__ remains stored in the TLS1/TLS2 instances, although it is not
+ # really accessible any more.
+
+ assert TLS1.aa == "hello"
+
+
+ def test_local_init(self):
+ import thread
+ feedback = []
+ seen = {}
+
+ class X(thread._local):
+ def __init__(self, n):
+ assert n == 42
+ self.tag = len(feedback)
+ feedback.append(1)
+
+ x = X(42)
+ assert x.tag == 0
+ assert feedback == [1]
+ def f():
+ seen[x.tag] = True
+ for i in range(5):
+ thread.start_new_thread(f, ())
+ self.waitfor(lambda: len(seen) == 5, timeout=20.0)
+ assert seen == {1: True,
+ 2: True,
+ 3: True,
+ 4: True,
+ 5: True}
+ assert len(feedback) == 6
Added: pypy/dist/pypy/module/thread/test/test_lock.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/test/test_lock.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,29 @@
+from pypy.module.thread.test.support import GenericTestThread
+
+
+class AppTestLock(GenericTestThread):
+
+ def test_lock(self):
+ import thread
+ lock = thread.allocate_lock()
+ assert type(lock) is thread.LockType
+ assert lock.locked() is False
+ raises(thread.error, lock.release)
+ assert lock.locked() is False
+ lock.acquire()
+ assert lock.locked() is True
+ lock.release()
+ assert lock.locked() is False
+ raises(thread.error, lock.release)
+ assert lock.locked() is False
+ feedback = []
+ lock.acquire()
+ def f():
+ self.busywait(0.25)
+ feedback.append(42)
+ lock.release()
+ assert lock.locked() is True
+ thread.start_new_thread(f, ())
+ lock.acquire()
+ assert lock.locked() is True
+ assert feedback == [42]
Added: pypy/dist/pypy/module/thread/test/test_thread.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/test/test_thread.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,128 @@
+from pypy.module.thread.test.support import GenericTestThread
+
+
+class AppTestThread(GenericTestThread):
+
+ def test_start_new_thread(self):
+ import thread
+ feedback = []
+ please_start = []
+ def f(x, y, z):
+ self.waitfor(lambda: please_start)
+ feedback.append(42)
+ thread.start_new_thread(f, (1, 2), {'z': 3})
+ assert feedback == [] # still empty
+ please_start.append(1) # trigger
+ self.waitfor(lambda: feedback)
+ assert feedback == [42]
+
+ def test_get_ident(self):
+ import thread
+ ident = thread.get_ident()
+ feedback = []
+ def f():
+ feedback.append(thread.get_ident())
+ ident2 = thread.start_new_thread(f, ())
+ assert ident2 != ident
+ assert ident == thread.get_ident()
+ self.waitfor(lambda: feedback)
+ assert feedback == [ident2]
+
+ def test_sys_getframe(self):
+ # this checks that each thread gets its own ExecutionContext.
+ def main():
+ import thread, sys
+ def dump_frames(feedback):
+ f = sys._getframe()
+ for i in range(3):
+ if f is None:
+ feedback.append(None)
+ else:
+ feedback.append(f.f_code.co_name)
+ self.busywait(0.04)
+ assert f is sys._getframe(i)
+ f = f.f_back
+ def dummyfn(feedback):
+ dump_frames(feedback)
+ feedback = []
+ dummyfn(feedback)
+ assert feedback == ['dump_frames', 'dummyfn', 'main']
+ feedbacks = []
+ for i in range(3):
+ feedback = []
+ thread.start_new_thread(dummyfn, (feedback,))
+ feedbacks.append(feedback)
+ expected = 3*[['dump_frames', 'dummyfn', None]] # without 'main'
+ self.waitfor(lambda: feedbacks == expected)
+ assert feedbacks == expected
+ main()
+
+ def test_thread_exit(self):
+ import thread, sys, StringIO
+ def fn1():
+ thread.exit()
+ def fn2():
+ raise SystemExit
+ def fn3():
+ raise ValueError("hello world")
+ prev = sys.stderr
+ try:
+ sys.stderr = StringIO.StringIO()
+ thread.start_new_thread(fn1, ())
+ thread.start_new_thread(fn2, ())
+ self.busywait(0.2) # time for the threads to finish
+ assert sys.stderr.getvalue() == ''
+
+ sys.stderr = StringIO.StringIO()
+ thread.start_new_thread(fn3, ())
+ self.waitfor(lambda: "ValueError" in sys.stderr.getvalue())
+ result = sys.stderr.getvalue()
+ assert "ValueError" in result
+ assert "hello world" in result
+ finally:
+ sys.stderr = prev
+
+ def test_perthread_excinfo(self):
+ import thread, sys
+ done = []
+ def fn1(n):
+ success = False
+ try:
+ caught = False
+ try:
+ try:
+ {}[n]
+ except KeyError:
+ self.busywait(0.05)
+ caught = True
+ raise
+ except KeyError:
+ self.busywait(0.05)
+ assert caught
+ etype, evalue, etb = sys.exc_info()
+ assert etype is KeyError
+ assert evalue.args[0] == n
+ success = True
+ finally:
+ done.append(success)
+ for i in range(20):
+ thread.start_new_thread(fn1, (i,))
+ self.waitfor(lambda: len(done) == 20)
+ assert done == 20*[True] # see stderr for failures in the threads
+
+ def test_no_corruption(self):
+ import thread
+ lst = []
+ done_marker = []
+ def f(x, done):
+ for j in range(40):
+ lst.insert(0, x+j) # all threads trying to modify the same list
+ done.append(True)
+ for i in range(0, 120, 40):
+ done = []
+ thread.start_new_thread(f, (i, done))
+ done_marker.append(done)
+ for done in done_marker:
+ self.waitfor(lambda: done, timeout=20.0)
+ assert done # see stderr for failures in threads
+ assert sorted(lst) == range(120)
Added: pypy/dist/pypy/module/thread/threadlocals.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/module/thread/threadlocals.py Fri Aug 19 19:20:21 2005
@@ -0,0 +1,45 @@
+import thread
+
+
+class OSThreadLocals:
+ """Thread-local storage for OS-level threads.
+ For memory management, this version depends on explicit notification when
+ a thread finishes. This works as long as the thread was started by
+ os_thread.bootstrap()."""
+
+ _valuedict = {} # {thread_ident: ExecutionContext()}
+
+ def getvalue(self):
+ ident = thread.get_ident()
+ return self._valuedict.get(ident, None)
+
+ def setvalue(self, value):
+ ident = thread.get_ident()
+ self._valuedict[ident] = value
+
+ def enter_thread(self, space):
+ "Notification that the current thread is just starting."
+ ec = space.getexecutioncontext()
+ ec.thread_exit_funcs = []
+
+ def leave_thread(self, space):
+ "Notification that the current thread is about to stop."
+ try:
+ ec = space.getexecutioncontext()
+ while ec.thread_exit_funcs:
+ exit_func, w_obj = ec.thread_exit_funcs.pop()
+ exit_func(w_obj)
+ finally:
+ ident = thread.get_ident()
+ try:
+ del self._valuedict[ident]
+ except KeyError:
+ pass
+
+ def yield_thread(self):
+ """Notification that the current thread is between two bytecodes
+ (so that it's a good time to yield some time to other threads)."""
+
+ def atthreadexit(self, space, exit_func, w_obj):
+ ec = space.getexecutioncontext()
+ ec.thread_exit_funcs.append((exit_func, w_obj))
More information about the Pypy-commit
mailing list