[pypy-commit] pypy default: Install the py3k version of C thread helpers.
amauryfa
noreply at buildbot.pypy.org
Mon Jan 28 09:18:33 CET 2013
Author: Amaury Forgeot d'Arc <amauryfa at gmail.com>
Branch:
Changeset: r60580:053753bebf0c
Date: 2013-01-28 09:14 +0100
http://bitbucket.org/pypy/pypy/changeset/053753bebf0c/
Log: Install the py3k version of C thread helpers. Also implements
Rlock.acquire_timed()
diff --git a/rpython/rlib/rthread.py b/rpython/rlib/rthread.py
--- a/rpython/rlib/rthread.py
+++ b/rpython/rlib/rthread.py
@@ -19,9 +19,9 @@
separate_module_files = [translator_c_dir / 'src' / 'thread.c'],
include_dirs = [translator_c_dir],
export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit',
- 'RPyThreadAcquireLock', 'RPyThreadReleaseLock',
- 'RPyGilAllocate', 'RPyGilYieldThread',
- 'RPyGilRelease', 'RPyGilAcquire',
+ 'RPyThreadAcquireLock', 'RPyThreadAcquireLockTimed',
+ 'RPyThreadReleaseLock', 'RPyGilAllocate',
+ 'RPyGilYieldThread', 'RPyGilRelease', 'RPyGilAcquire',
'RPyThreadGetStackSize', 'RPyThreadSetStackSize',
'RPyOpaqueDealloc_ThreadLock',
'RPyThreadAfterFork']
@@ -61,6 +61,10 @@
c_thread_acquirelock = llexternal('RPyThreadAcquireLock', [TLOCKP, rffi.INT],
rffi.INT,
threadsafe=True) # release the GIL
+c_thread_acquirelock_timed = llexternal('RPyThreadAcquireLockTimed',
+ [TLOCKP, rffi.LONGLONG, rffi.INT],
+ rffi.INT,
+ threadsafe=True) # release the GIL
c_thread_releaselock = llexternal('RPyThreadReleaseLock', [TLOCKP], lltype.Void,
threadsafe=True) # release the GIL
@@ -121,6 +125,12 @@
res = rffi.cast(lltype.Signed, res)
return bool(res)
+ def acquire_timed(self, timeout):
+ "timeout is in microseconds."
+ res = c_thread_acquirelock_timed(self._lock, timeout, 1)
+ res = rffi.cast(lltype.Signed, res)
+ return bool(res)
+
def release(self):
# Sanity check: the lock must be locked
if self.acquire(False):
diff --git a/rpython/rlib/test/test_rthread.py b/rpython/rlib/test/test_rthread.py
--- a/rpython/rlib/test/test_rthread.py
+++ b/rpython/rlib/test/test_rthread.py
@@ -153,6 +153,23 @@
answers = fn()
assert answers == expected
+ def test_acquire_timed(self):
+ import time
+ def f():
+ l = allocate_lock()
+ l.acquire(True)
+ t1 = time.time()
+ ok = l.acquire_timed(1000000)
+ t2 = time.time()
+ delay = t2 - t1
+ if ok:
+ return delay
+ else:
+ return -delay
+ fn = self.getcompiled(f, [])
+ res = fn()
+ assert res < -1.0
+
#class TestRunDirectly(AbstractThreadTests):
# def getcompiled(self, f, argtypes):
# return f
diff --git a/rpython/translator/c/src/thread.h b/rpython/translator/c/src/thread.h
--- a/rpython/translator/c/src/thread.h
+++ b/rpython/translator/c/src/thread.h
@@ -2,6 +2,14 @@
#define __PYPY_THREAD_H
#include <assert.h>
+#define RPY_TIMEOUT_T long long
+
+typedef enum RPyLockStatus {
+ RPY_LOCK_FAILURE = 0,
+ RPY_LOCK_ACQUIRED = 1,
+ RPY_LOCK_INTR
+} RPyLockStatus;
+
#ifdef _WIN32
#include "thread_nt.h"
#else
diff --git a/rpython/translator/c/src/thread_nt.c b/rpython/translator/c/src/thread_nt.c
--- a/rpython/translator/c/src/thread_nt.c
+++ b/rpython/translator/c/src/thread_nt.c
@@ -102,47 +102,24 @@
BOOL InitializeNonRecursiveMutex(PNRMUTEX mutex)
{
- mutex->owned = -1 ; /* No threads have entered NonRecursiveMutex */
- mutex->thread_id = 0 ;
- mutex->hevent = CreateEvent(NULL, FALSE, FALSE, NULL) ;
- return mutex->hevent != NULL ; /* TRUE if the mutex is created */
+ mutex->sem = CreateSemaphore(NULL, 1, 1, NULL);
}
VOID DeleteNonRecursiveMutex(PNRMUTEX mutex)
{
- /* No in-use check */
- CloseHandle(mutex->hevent) ;
- mutex->hevent = NULL ; /* Just in case */
+ /* No in-use check */
+ CloseHandle(mutex->sem);
+ mutex->sem = NULL ; /* Just in case */
}
DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait)
{
- /* Assume that the thread waits successfully */
- DWORD ret ;
-
- /* InterlockedIncrement(&mutex->owned) == 0 means that no thread currently owns the mutex */
- if (!wait)
- {
- if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1)
- return WAIT_TIMEOUT ;
- ret = WAIT_OBJECT_0 ;
- }
- else
- ret = InterlockedIncrement(&mutex->owned) ?
- /* Some thread owns the mutex, let's wait... */
- WaitForSingleObject(mutex->hevent, INFINITE) : WAIT_OBJECT_0 ;
-
- mutex->thread_id = GetCurrentThreadId() ; /* We own it */
- return ret ;
+ return WaitForSingleObject(mutex->sem, milliseconds);
}
BOOL LeaveNonRecursiveMutex(PNRMUTEX mutex)
{
- /* We don't own the mutex */
- mutex->thread_id = 0 ;
- return
- InterlockedDecrement(&mutex->owned) < 0 ||
- SetEvent(mutex->hevent) ; /* Other threads are waiting, wake one on them up */
+ return ReleaseSemaphore(mutex->sem, 1, NULL);
}
/************************************************************/
@@ -158,8 +135,8 @@
void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock)
{
- if (lock->hevent != NULL)
- DeleteNonRecursiveMutex(lock);
+ if (lock->sem != NULL)
+ DeleteNonRecursiveMutex(lock);
}
/*
@@ -168,9 +145,40 @@
* and 0 if the lock was not acquired. This means a 0 is returned
* if the lock has already been acquired by this thread!
*/
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+ RPY_TIMEOUT_T microseconds, int intr_flag)
+{
+ /* Fow now, intr_flag does nothing on Windows, and lock acquires are
+ * uninterruptible. */
+ PyLockStatus success;
+ PY_TIMEOUT_T milliseconds;
+
+ if (microseconds >= 0) {
+ milliseconds = microseconds / 1000;
+ if (microseconds % 1000 > 0)
+ ++milliseconds;
+ if ((DWORD) milliseconds != milliseconds)
+ Py_FatalError("Timeout too large for a DWORD, "
+ "please check PY_TIMEOUT_MAX");
+ }
+ else
+ milliseconds = INFINITE;
+
+ if (lock && EnterNonRecursiveMutex(
+ lock, (DWORD)milliseconds) == WAIT_OBJECT_0) {
+ success = PY_LOCK_ACQUIRED;
+ }
+ else {
+ success = PY_LOCK_FAILURE;
+ }
+
+ return success;
+}
+
int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
{
- return EnterNonRecursiveMutex(lock, (waitflag != 0 ? INFINITE : 0)) == WAIT_OBJECT_0;
+ return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
}
void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock)
diff --git a/rpython/translator/c/src/thread_nt.h b/rpython/translator/c/src/thread_nt.h
--- a/rpython/translator/c/src/thread_nt.h
+++ b/rpython/translator/c/src/thread_nt.h
@@ -5,16 +5,16 @@
*/
typedef struct RPyOpaque_ThreadLock {
- LONG owned ;
- DWORD thread_id ;
- HANDLE hevent ;
-};
+ HANDLE sem;
+} NRMUTEX, *PNRMUTEX;
/* prototypes */
long RPyThreadStart(void (*func)(void));
int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock);
void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock);
int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag);
+RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+ RPY_TIMEOUT_T timeout, int intr_flag);
void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock);
long RPyThreadGetStackSize(void);
long RPyThreadSetStackSize(long);
diff --git a/rpython/translator/c/src/thread_pthread.c b/rpython/translator/c/src/thread_pthread.c
--- a/rpython/translator/c/src/thread_pthread.c
+++ b/rpython/translator/c/src/thread_pthread.c
@@ -8,6 +8,7 @@
#include <stdio.h>
#include <errno.h>
#include <assert.h>
+#include <sys/time.h>
/* The following is hopefully equivalent to what CPython does
(which is trying to compile a snippet of code using it) */
@@ -170,6 +171,29 @@
#endif
}
+#ifdef GETTIMEOFDAY_NO_TZ
+#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv)
+#else
+#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv, (struct timezone *)NULL)
+#endif
+
+#define RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts) \
+do { \
+ struct timeval tv; \
+ RPY_GETTIMEOFDAY(&tv); \
+ tv.tv_usec += microseconds % 1000000; \
+ tv.tv_sec += microseconds / 1000000; \
+ tv.tv_sec += tv.tv_usec / 1000000; \
+ tv.tv_usec %= 1000000; \
+ ts.tv_sec = tv.tv_sec; \
+ ts.tv_nsec = tv.tv_usec * 1000; \
+} while(0)
+
+int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+{
+ return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
+}
+
/************************************************************/
#ifdef USE_SEMAPHORES
/************************************************************/
@@ -215,26 +239,50 @@
return (status == -1) ? errno : status;
}
-int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+ RPY_TIMEOUT_T microseconds, int intr_flag)
{
- int success;
+ RPyLockStatus success;
sem_t *thelock = &lock->sem;
int status, error = 0;
+ struct timespec ts;
+ if (microseconds > 0)
+ RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts);
do {
- if (waitflag)
- status = rpythread_fix_status(sem_wait(thelock));
- else
- status = rpythread_fix_status(sem_trywait(thelock));
- } while (status == EINTR); /* Retry if interrupted by a signal */
+ if (microseconds > 0)
+ status = rpythread_fix_status(sem_timedwait(thelock, &ts));
+ else if (microseconds == 0)
+ status = rpythread_fix_status(sem_trywait(thelock));
+ else
+ status = rpythread_fix_status(sem_wait(thelock));
+ /* Retry if interrupted by a signal, unless the caller wants to be
+ notified. */
+ } while (!intr_flag && status == EINTR);
- if (waitflag) {
+ /* Don't check the status if we're stopping because of an interrupt. */
+ if (!(intr_flag && status == EINTR)) {
+ if (microseconds > 0) {
+ if (status != ETIMEDOUT)
+ CHECK_STATUS("sem_timedwait");
+ }
+ else if (microseconds == 0) {
+ if (status != EAGAIN)
+ CHECK_STATUS("sem_trywait");
+ }
+ else {
CHECK_STATUS("sem_wait");
- } else if (status != EAGAIN) {
- CHECK_STATUS("sem_trywait");
+ }
}
-
- success = (status == 0) ? 1 : 0;
+
+ if (status == 0) {
+ success = RPY_LOCK_ACQUIRED;
+ } else if (intr_flag && status == EINTR) {
+ success = RPY_LOCK_INTR;
+ } else {
+ success = RPY_LOCK_FAILURE;
+ }
return success;
}
@@ -326,32 +374,63 @@
}
}
-int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+ RPY_TIMEOUT_T microseconds, int intr_flag)
{
- int success;
+ RPyLockStatus success;
int status, error = 0;
status = pthread_mutex_lock( &lock->mut );
CHECK_STATUS("pthread_mutex_lock[1]");
- success = lock->locked == 0;
- if ( !success && waitflag ) {
+ if (lock->locked == 0) {
+ success = RPY_LOCK_ACQUIRED;
+ } else if (microseconds == 0) {
+ success = RPY_LOCK_FAILURE;
+ } else {
+ struct timespec ts;
+ if (microseconds > 0)
+ RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts);
/* continue trying until we get the lock */
/* mut must be locked by me -- part of the condition
* protocol */
- while ( lock->locked ) {
- status = pthread_cond_wait(&lock->lock_released,
- &lock->mut);
+ success = RPY_LOCK_FAILURE;
+ while (success == RPY_LOCK_FAILURE) {
+ if (microseconds > 0) {
+ status = pthread_cond_timedwait(
+ &lock->lock_released,
+ &lock->mut, &ts);
+ if (status == ETIMEDOUT)
+ break;
+ CHECK_STATUS("pthread_cond_timed_wait");
+ }
+ else {
+ status = pthread_cond_wait(
+ &lock->lock_released,
+ &lock->mut);
CHECK_STATUS("pthread_cond_wait");
+ }
+
+ if (intr_flag && status == 0 && lock->locked) {
+ /* We were woken up, but didn't get the lock. We probably received
+ * a signal. Return RPY_LOCK_INTR to allow the caller to handle
+ * it and retry. */
+ success = RPY_LOCK_INTR;
+ break;
+ } else if (status == 0 && !lock->locked) {
+ success = RPY_LOCK_ACQUIRED;
+ } else {
+ success = RPY_LOCK_FAILURE;
+ }
}
- success = 1;
}
- if (success) lock->locked = 1;
+ if (success == RPY_LOCK_ACQUIRED) lock->locked = 1;
status = pthread_mutex_unlock( &lock->mut );
CHECK_STATUS("pthread_mutex_unlock[1]");
- if (error) success = 0;
+ if (error) success = RPY_LOCK_FAILURE;
return success;
}
diff --git a/rpython/translator/c/src/thread_pthread.h b/rpython/translator/c/src/thread_pthread.h
--- a/rpython/translator/c/src/thread_pthread.h
+++ b/rpython/translator/c/src/thread_pthread.h
@@ -64,6 +64,8 @@
int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock);
void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock);
int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag);
+RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+ RPY_TIMEOUT_T timeout, int intr_flag);
void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock);
long RPyThreadGetStackSize(void);
long RPyThreadSetStackSize(long);
More information about the pypy-commit
mailing list