[pypy-commit] pypy default: Install the py3k version of C thread helpers.

amauryfa noreply at buildbot.pypy.org
Mon Jan 28 09:18:33 CET 2013


Author: Amaury Forgeot d'Arc <amauryfa at gmail.com>
Branch: 
Changeset: r60580:053753bebf0c
Date: 2013-01-28 09:14 +0100
http://bitbucket.org/pypy/pypy/changeset/053753bebf0c/

Log:	Install the py3k version of C thread helpers. Also implements
	Rlock.acquire_timed()

diff --git a/rpython/rlib/rthread.py b/rpython/rlib/rthread.py
--- a/rpython/rlib/rthread.py
+++ b/rpython/rlib/rthread.py
@@ -19,9 +19,9 @@
     separate_module_files = [translator_c_dir / 'src' / 'thread.c'],
     include_dirs = [translator_c_dir],
     export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit',
-                      'RPyThreadAcquireLock', 'RPyThreadReleaseLock',
-                      'RPyGilAllocate', 'RPyGilYieldThread',
-                      'RPyGilRelease', 'RPyGilAcquire',
+                      'RPyThreadAcquireLock', 'RPyThreadAcquireLockTimed',
+                      'RPyThreadReleaseLock', 'RPyGilAllocate',
+                      'RPyGilYieldThread', 'RPyGilRelease', 'RPyGilAcquire',
                       'RPyThreadGetStackSize', 'RPyThreadSetStackSize',
                       'RPyOpaqueDealloc_ThreadLock',
                       'RPyThreadAfterFork']
@@ -61,6 +61,10 @@
 c_thread_acquirelock = llexternal('RPyThreadAcquireLock', [TLOCKP, rffi.INT],
                                   rffi.INT,
                                   threadsafe=True)    # release the GIL
+c_thread_acquirelock_timed = llexternal('RPyThreadAcquireLockTimed', 
+                                        [TLOCKP, rffi.LONGLONG, rffi.INT],
+                                        rffi.INT,
+                                        threadsafe=True)    # release the GIL
 c_thread_releaselock = llexternal('RPyThreadReleaseLock', [TLOCKP], lltype.Void,
                                   threadsafe=True)    # release the GIL
 
@@ -121,6 +125,12 @@
         res = rffi.cast(lltype.Signed, res)
         return bool(res)
 
+    def acquire_timed(self, timeout):
+        "timeout is in microseconds."
+        res = c_thread_acquirelock_timed(self._lock, timeout, 1)
+        res = rffi.cast(lltype.Signed, res)
+        return bool(res)
+
     def release(self):
         # Sanity check: the lock must be locked
         if self.acquire(False):
diff --git a/rpython/rlib/test/test_rthread.py b/rpython/rlib/test/test_rthread.py
--- a/rpython/rlib/test/test_rthread.py
+++ b/rpython/rlib/test/test_rthread.py
@@ -153,6 +153,23 @@
         answers = fn()
         assert answers == expected
 
+    def test_acquire_timed(self):
+        import time
+        def f():
+            l = allocate_lock()
+            l.acquire(True)
+            t1 = time.time()
+            ok = l.acquire_timed(1000000)
+            t2 = time.time()
+            delay = t2 - t1
+            if ok:
+                return delay
+            else:
+                return -delay
+        fn = self.getcompiled(f, [])
+        res = fn()
+        assert res < -1.0
+
 #class TestRunDirectly(AbstractThreadTests):
 #    def getcompiled(self, f, argtypes):
 #        return f
diff --git a/rpython/translator/c/src/thread.h b/rpython/translator/c/src/thread.h
--- a/rpython/translator/c/src/thread.h
+++ b/rpython/translator/c/src/thread.h
@@ -2,6 +2,14 @@
 #define __PYPY_THREAD_H
 #include <assert.h>
 
+#define RPY_TIMEOUT_T long long
+
+typedef enum RPyLockStatus {
+    RPY_LOCK_FAILURE = 0,
+    RPY_LOCK_ACQUIRED = 1,
+    RPY_LOCK_INTR
+} RPyLockStatus;
+
 #ifdef _WIN32
 #include "thread_nt.h"
 #else
diff --git a/rpython/translator/c/src/thread_nt.c b/rpython/translator/c/src/thread_nt.c
--- a/rpython/translator/c/src/thread_nt.c
+++ b/rpython/translator/c/src/thread_nt.c
@@ -102,47 +102,24 @@
 
 BOOL InitializeNonRecursiveMutex(PNRMUTEX mutex)
 {
-	mutex->owned = -1 ;  /* No threads have entered NonRecursiveMutex */
-	mutex->thread_id = 0 ;
-	mutex->hevent = CreateEvent(NULL, FALSE, FALSE, NULL) ;
-	return mutex->hevent != NULL ;	/* TRUE if the mutex is created */
+    mutex->sem = CreateSemaphore(NULL, 1, 1, NULL);
 }
 
 VOID DeleteNonRecursiveMutex(PNRMUTEX mutex)
 {
-	/* No in-use check */
-	CloseHandle(mutex->hevent) ;
-	mutex->hevent = NULL ; /* Just in case */
+    /* No in-use check */
+    CloseHandle(mutex->sem);
+    mutex->sem = NULL ; /* Just in case */
 }
 
 DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait)
 {
-	/* Assume that the thread waits successfully */
-	DWORD ret ;
-
-	/* InterlockedIncrement(&mutex->owned) == 0 means that no thread currently owns the mutex */
-	if (!wait)
-	{
-		if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1)
-			return WAIT_TIMEOUT ;
-		ret = WAIT_OBJECT_0 ;
-	}
-	else
-		ret = InterlockedIncrement(&mutex->owned) ?
-			/* Some thread owns the mutex, let's wait... */
-			WaitForSingleObject(mutex->hevent, INFINITE) : WAIT_OBJECT_0 ;
-
-	mutex->thread_id = GetCurrentThreadId() ; /* We own it */
-	return ret ;
+    return WaitForSingleObject(mutex->sem, milliseconds);
 }
 
 BOOL LeaveNonRecursiveMutex(PNRMUTEX mutex)
 {
-	/* We don't own the mutex */
-	mutex->thread_id = 0 ;
-	return
-		InterlockedDecrement(&mutex->owned) < 0 ||
-		SetEvent(mutex->hevent) ; /* Other threads are waiting, wake one on them up */
+    return ReleaseSemaphore(mutex->sem, 1, NULL);
 }
 
 /************************************************************/
@@ -158,8 +135,8 @@
 
 void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock)
 {
-	if (lock->hevent != NULL)
-		DeleteNonRecursiveMutex(lock);
+    if (lock->sem != NULL)
+	DeleteNonRecursiveMutex(lock);
 }
 
 /*
@@ -168,9 +145,40 @@
  * and 0 if the lock was not acquired. This means a 0 is returned
  * if the lock has already been acquired by this thread!
  */
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+			  RPY_TIMEOUT_T microseconds, int intr_flag)
+{
+    /* Fow now, intr_flag does nothing on Windows, and lock acquires are
+     * uninterruptible.  */
+    PyLockStatus success;
+    PY_TIMEOUT_T milliseconds;
+
+    if (microseconds >= 0) {
+        milliseconds = microseconds / 1000;
+        if (microseconds % 1000 > 0)
+            ++milliseconds;
+        if ((DWORD) milliseconds != milliseconds)
+            Py_FatalError("Timeout too large for a DWORD, "
+                           "please check PY_TIMEOUT_MAX");
+    }
+    else
+        milliseconds = INFINITE;
+
+    if (lock && EnterNonRecursiveMutex(
+	    lock, (DWORD)milliseconds) == WAIT_OBJECT_0) {
+        success = PY_LOCK_ACQUIRED;
+    }
+    else {
+        success = PY_LOCK_FAILURE;
+    }
+
+    return success;
+}
+
 int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
 {
-	return EnterNonRecursiveMutex(lock, (waitflag != 0 ? INFINITE : 0)) == WAIT_OBJECT_0;
+    return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
 }
 
 void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock)
diff --git a/rpython/translator/c/src/thread_nt.h b/rpython/translator/c/src/thread_nt.h
--- a/rpython/translator/c/src/thread_nt.h
+++ b/rpython/translator/c/src/thread_nt.h
@@ -5,16 +5,16 @@
  */
 
 typedef struct RPyOpaque_ThreadLock {
-	LONG   owned ;
-	DWORD  thread_id ;
-	HANDLE hevent ;
-};
+    HANDLE sem;
+} NRMUTEX, *PNRMUTEX;
 
 /* prototypes */
 long RPyThreadStart(void (*func)(void));
 int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock);
 void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock);
 int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag);
+RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+					RPY_TIMEOUT_T timeout, int intr_flag);
 void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock);
 long RPyThreadGetStackSize(void);
 long RPyThreadSetStackSize(long);
diff --git a/rpython/translator/c/src/thread_pthread.c b/rpython/translator/c/src/thread_pthread.c
--- a/rpython/translator/c/src/thread_pthread.c
+++ b/rpython/translator/c/src/thread_pthread.c
@@ -8,6 +8,7 @@
 #include <stdio.h>
 #include <errno.h>
 #include <assert.h>
+#include <sys/time.h>
 
 /* The following is hopefully equivalent to what CPython does
    (which is trying to compile a snippet of code using it) */
@@ -170,6 +171,29 @@
 #endif
 }
 
+#ifdef GETTIMEOFDAY_NO_TZ
+#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv)
+#else
+#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv, (struct timezone *)NULL)
+#endif
+
+#define RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts) \
+do { \
+    struct timeval tv; \
+    RPY_GETTIMEOFDAY(&tv); \
+    tv.tv_usec += microseconds % 1000000; \
+    tv.tv_sec += microseconds / 1000000; \
+    tv.tv_sec += tv.tv_usec / 1000000; \
+    tv.tv_usec %= 1000000; \
+    ts.tv_sec = tv.tv_sec; \
+    ts.tv_nsec = tv.tv_usec * 1000; \
+} while(0)
+
+int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+{
+    return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
+}
+
 /************************************************************/
 #ifdef USE_SEMAPHORES
 /************************************************************/
@@ -215,26 +239,50 @@
 	return (status == -1) ? errno : status;
 }
 
-int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+			  RPY_TIMEOUT_T microseconds, int intr_flag)
 {
-	int success;
+	RPyLockStatus success;
 	sem_t *thelock = &lock->sem;
 	int status, error = 0;
+	struct timespec ts;
 
+	if (microseconds > 0)
+		RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts);
 	do {
-		if (waitflag)
-			status = rpythread_fix_status(sem_wait(thelock));
-		else
-			status = rpythread_fix_status(sem_trywait(thelock));
-	} while (status == EINTR); /* Retry if interrupted by a signal */
+	    if (microseconds > 0)
+		status = rpythread_fix_status(sem_timedwait(thelock, &ts));
+	    else if (microseconds == 0)
+		status = rpythread_fix_status(sem_trywait(thelock));
+	    else
+		status = rpythread_fix_status(sem_wait(thelock));
+	    /* Retry if interrupted by a signal, unless the caller wants to be
+	       notified.  */
+	} while (!intr_flag && status == EINTR);
 
-	if (waitflag) {
+	/* Don't check the status if we're stopping because of an interrupt.  */
+	if (!(intr_flag && status == EINTR)) {
+	    if (microseconds > 0) {
+		if (status != ETIMEDOUT)
+		    CHECK_STATUS("sem_timedwait");
+	    }
+	    else if (microseconds == 0) {
+		if (status != EAGAIN)
+		    CHECK_STATUS("sem_trywait");
+	    }
+	    else {
 		CHECK_STATUS("sem_wait");
-	} else if (status != EAGAIN) {
-		CHECK_STATUS("sem_trywait");
+	    }
 	}
-	
-	success = (status == 0) ? 1 : 0;
+
+	if (status == 0) {
+	    success = RPY_LOCK_ACQUIRED;
+	} else if (intr_flag && status == EINTR) {
+	    success = RPY_LOCK_INTR;
+	} else {
+	    success = RPY_LOCK_FAILURE;
+	}
 	return success;
 }
 
@@ -326,32 +374,63 @@
 	}
 }
 
-int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+			  RPY_TIMEOUT_T microseconds, int intr_flag)
 {
-	int success;
+	RPyLockStatus success;
 	int status, error = 0;
 
 	status = pthread_mutex_lock( &lock->mut );
 	CHECK_STATUS("pthread_mutex_lock[1]");
-	success = lock->locked == 0;
 
-	if ( !success && waitflag ) {
+	if (lock->locked == 0) {
+	    success = RPY_LOCK_ACQUIRED;
+	} else if (microseconds == 0) {
+	    success = RPY_LOCK_FAILURE;
+	} else {
+		struct timespec ts;
+		if (microseconds > 0)
+		    RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts);
 		/* continue trying until we get the lock */
 
 		/* mut must be locked by me -- part of the condition
 		 * protocol */
-		while ( lock->locked ) {
-			status = pthread_cond_wait(&lock->lock_released,
-						   &lock->mut);
+		success = RPY_LOCK_FAILURE;
+		while (success == RPY_LOCK_FAILURE) {
+		    if (microseconds > 0) {
+			status = pthread_cond_timedwait(
+			    &lock->lock_released,
+			    &lock->mut, &ts);
+			if (status == ETIMEDOUT)
+			    break;
+			CHECK_STATUS("pthread_cond_timed_wait");
+		    }
+		    else {
+			status = pthread_cond_wait(
+			    &lock->lock_released,
+			    &lock->mut);
 			CHECK_STATUS("pthread_cond_wait");
+		    }
+
+		    if (intr_flag && status == 0 && lock->locked) {
+			/* We were woken up, but didn't get the lock.  We probably received
+			 * a signal.  Return RPY_LOCK_INTR to allow the caller to handle
+			 * it and retry.  */
+			success = RPY_LOCK_INTR;
+			break;
+		    } else if (status == 0 && !lock->locked) {
+			success = RPY_LOCK_ACQUIRED;
+		    } else {
+			success = RPY_LOCK_FAILURE;
+		    }
 		}
-		success = 1;
 	}
-	if (success) lock->locked = 1;
+	if (success == RPY_LOCK_ACQUIRED) lock->locked = 1;
 	status = pthread_mutex_unlock( &lock->mut );
 	CHECK_STATUS("pthread_mutex_unlock[1]");
 
-	if (error) success = 0;
+	if (error) success = RPY_LOCK_FAILURE;
 	return success;
 }
 
diff --git a/rpython/translator/c/src/thread_pthread.h b/rpython/translator/c/src/thread_pthread.h
--- a/rpython/translator/c/src/thread_pthread.h
+++ b/rpython/translator/c/src/thread_pthread.h
@@ -64,6 +64,8 @@
 int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock);
 void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock);
 int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag);
+RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+					RPY_TIMEOUT_T timeout, int intr_flag);
 void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock);
 long RPyThreadGetStackSize(void);
 long RPyThreadSetStackSize(long);


More information about the pypy-commit mailing list