How to get a raised exception from other thread
Antoon Pardon
apardon at forel.vub.ac.be
Mon Oct 17 04:24:28 EDT 2005
Op 2005-10-14, dcrespo schreef <dcrespo at gmail.com>:
> Hi all,
>
> How can I get a raised exception from other thread that is in an
> imported module?
>
> For example:
You could try the following class, it needs ctypes and if the exception
is raised while in a C-extention, the exception will be delayed until
the extention is left. Some delay is unavoidable. You may also need
to fiddle with the length of the sleep at the end of this script
or the values in the xrange calls in the Continue threads
---------------------------- except.py --------------------------------
import os
import ctypes
from time import sleep
from random import randint
class TimeOut(Exception):
pass
class Alarm(Exception):
pass
import threading
class Xthread(threading.Thread):
def start(self):
self.__original_run = self.run
self.run = self.__run
threading.Thread.start(self)
def __run(self):
self.__thrd_id = threading._get_ident()
try:
self.__original_run()
finally:
self.run = self.__original_run
def raize(self, excpt):
Nr = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.__thrd_id, ctypes.py_object(excpt))
while Nr > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(self.__thrd_id, None)
sleep(0.1)
Nr = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.__thrd_id, ctypes.py_object(excpt))
def alarm(self, tm):
alrm = threading.Timer(tm, self.raize, (TimeOut,))
alrm.start()
return alrm
class Continue(Xthread):
def run(self):
self.id = os.getpid()
print self.id, "Begin"
i = 0
try:
for _ in xrange(randint(0,20)):
for e in xrange(4 * 100000):
i = i + e
print self.id, "Finished"
except Alarm:
print self.id, "Interupted"
lst = [Continue() for _ in xrange(10)]
for T in lst:
T.start()
try:
sleep(15)
finally:
for T in lst:
T.raize(Alarm)
More information about the Python-list
mailing list