raise exceptions in generators/functions from other tasks/functions

Dominic nomail at nospam.no
Sat May 15 16:40:39 EDT 2004


Just in case someone is interested.
I came up with this solution since my previous
posting.
Maybe you've got some other ideas or critcism.
I'll listen ;-)

Ciao,
  Dominic

P.S. Python is really powerful!

Use-Case:

def driveTask(handler):
	print "  enter drive"
     	yield "current speed"

     	while 1:
		try:
     			handler.register(driveTask, (NoPower, WallHit))
  	        	while 1:
				print "  adjust engine"
         	    	        yield "current speed"
                     		print "  steer"

		except WallHit:
			print "hit the wall :-("
		except NoPower:
			print "need some fresh batteries :-)"


====================================================================

Full-Source code:

from types import GeneratorType

class TaskHandler(object):
	
	def __init__(self):
		self.tasks = []

	def addTask(self, task):
         	self.tasks.append(task)

	def removeTask(self, task):
		del self.tasks[task]

	def loop(self):
		for t in self.tasks:
			# generator task
			if type(t) == GeneratorType:
				if not t.next():
					# generator is done, remove it
					self.tasks.remove(t)
			else: # normal method/function
				t()

	
from sys import settrace

class SignalingTaskHandler(TaskHandler):

	def __init__(self):
		super(SignalingTaskHandler, self).__init__()

		self.function2signal = {}
		self.activeSignals = ()
		self.newSignals = []
		self.loopTask = self.__loop()

	def register(self, aFunction, signals):
		self.function2signal[id(aFunction.func_code)] = signals

	def signal(self, aSignal):
		if not aSignal in self.newSignals:
			self.newSignals.append(aSignal)

	def loop(self):
		self.loopTask.next()

	def __loop(self):
		while 1:
			settrace(self.__intercept)
			tmp = len(self.newSignals)

			try:
                 		super(SignalingTaskHandler, self).loop()
             		finally:
                 		settrace(None)

			# if we've reached "steady state", we make a step
			# I think Esterel-language has similar signaling semantics
             		if tmp == len(self.newSignals):
                			self.__step()

			yield True


	def __step(self):
		# switch to new signal set, discard old ones
		self.activeSignals = tuple(self.newSignals)
		self.newSignals = []

	def __intercept(self, frame, event, arg):
		def _i(frame, event, arg):	
			# we could return _i to trace inside function
			# and not only calls to it
			
			if self.activeSignals:
				signals = self.function2signal[key]
				# check if there is one signal to raise
         	    		for s in self.activeSignals:
                				if s in signals:
                	    				raise s
					
		# every function gets it's own tracer, otherwise
		# exceptions cannot be catched :-( Why?
	     	key = id(frame.f_code)
         	if key in self.function2signal:
         		return _i
		
		return None


# A Use Case :


class WallHit(Exception):
	pass	

class NoPower(Exception):
	pass

def driveTask(handler):
	print "  enter drive"
     	yield "current speed"

     	while 1:
		try:
     			handler.register(driveTask, (NoPower, WallHit))
  	        	while 1:
				print "  adjust engine"
         	    	        yield "current speed"
                     		print "  steer"

		except WallHit:
			print "hit the wall :-("
		except NoPower:
			print "need some fresh batteries :-)"

counter = 0

def miscTask():
     global counter
	
     if counter == 4:
	    handler.signal(WallHit)
     if counter == 5:
	    handler.signal(NoPower)
	
     counter += 1

# "special" tasks with signaling
handler = SignalingTaskHandler()
tmp = driveTask(handler)
handler.addTask(tmp)

# main loop

mainTaskHandler = TaskHandler()

# well, there could be a lot more
mainTaskHandler.addTask(miscTask)
mainTaskHandler.addTask(handler.loop)


for _ in range(10):
     mainTaskHandler.loop()


#   enter drive
#   adjust engine
#   steer
#   adjust engine
#   steer
#   adjust engine
#   steer
#   adjust engine
# hit the wall :-(
#   adjust engine
# need some fresh batteries :-)
#   adjust engine
#   steer
#   adjust engine
#   steer
#   adjust engine
#   steer
#   adjust engine



More information about the Python-list mailing list