[pypy-commit] pypy stmgc-c8: Keep the threads around for the next call to run()

arigo noreply at buildbot.pypy.org
Sun Jun 21 09:27:39 CEST 2015


Author: Armin Rigo <arigo at tunes.org>
Branch: stmgc-c8
Changeset: r78218:e92765580f57
Date: 2015-06-20 07:40 +0200
http://bitbucket.org/pypy/pypy/changeset/e92765580f57/

Log:	Keep the threads around for the next call to run()

diff --git a/lib_pypy/pypy_test/test_transaction.py b/lib_pypy/pypy_test/test_transaction.py
--- a/lib_pypy/pypy_test/test_transaction.py
+++ b/lib_pypy/pypy_test/test_transaction.py
@@ -1,5 +1,8 @@
 import py
-from lib_pypy import transaction
+try:
+    from lib_pypy import transaction
+except ImportError:
+    import transaction
 
 N = 1000
 VERBOSE = False
diff --git a/lib_pypy/transaction.py b/lib_pypy/transaction.py
--- a/lib_pypy/transaction.py
+++ b/lib_pypy/transaction.py
@@ -118,6 +118,8 @@
     finished.  The run() call only returns when the queue is completely
     empty.
     """
+    _nb_threads = 0
+    _thread_queue = queue()
 
     def __init__(self):
         self._queue = queue()
@@ -150,10 +152,16 @@
                 "TransactionQueue.run() cannot be called in an atomic context")
         if nb_segments <= 0:
             nb_segments = getsegmentlimit()
-
+        while TransactionQueue._nb_threads < nb_segments:
+            with atomic:
+                if TransactionQueue._nb_threads >= nb_segments:
+                    break
+                TransactionQueue._nb_threads += 1
+            thread.start_new_thread(TransactionQueue._thread_runner, ())
+        #
         self._exception = []
         for i in range(nb_segments):
-            thread.start_new_thread(self._thread_runner, ())
+            TransactionQueue._thread_queue.put((self._queue, self._exception))
         #
         # The threads run here until queue.join() returns, i.e. until
         # all add()ed transactions are executed.
@@ -164,29 +172,30 @@
         #
         if self._exception:
             exc_type, exc_value, exc_traceback = self._exception
-            self._exception = None
+            del self._exception
             raise exc_type, exc_value, exc_traceback
 
     #def number_of_transactions_executed(self):
     #    disabled for now
 
-    def _thread_runner(self):
-        queue = self._queue
-        exception = self._exception
+    @staticmethod
+    def _thread_runner():
         while True:
-            f, args, kwds = queue.get()
-            try:
-                if args is None:
-                    break
-                with atomic:
-                    if not exception:
-                        try:
-                            with signals_enabled:
-                                f(*args, **kwds)
-                        except:
-                            exception.extend(sys.exc_info())
-            finally:
-                queue.task_done()
+            queue, exception = TransactionQueue._thread_queue.get()
+            while True:
+                f, args, kwds = queue.get()
+                try:
+                    if args is None:
+                        break
+                    with atomic:
+                        if not exception:
+                            try:
+                                with signals_enabled:
+                                    f(*args, **kwds)
+                            except:
+                                exception.extend(sys.exc_info())
+                finally:
+                    queue.task_done()
 
 
 # ____________________________________________________________


More information about the pypy-commit mailing list