Micro-Threads: uthreadsleep.py -- scheduling of uthreads to c ontinue processing after given seconds

Just van Rossum just at letterror.com
Wed May 24 03:28:39 EDT 2000


At 12:28 AM -0400 24-05-2000, Mike Fletcher wrote:
>My guess would be there will number of sub-classes of this general pattern.
>These are currently represented by:
>	time.sleep() -- important
>	select.select() -- extremely important for network applications

It appears to me that select.select() is only relevant in non-threaded 
programs (do you have an example of its use in a threaded 
environment?).

I have been playing with a uthreaded socket wrapper that uses 
select under the hood. I've pasted it below for your amusement. 
It's in a very rough state, so don't expect too much from it...

Oh, for those of you interested in the latest uthread module, I've 
put a copy here:

  http://www.petr.nl/just/uthread4.py

Just

# usocket.py
import time
import sys
import select
import errno
import socket
socketmodule = socket  # XXX!!!
from socket import *
import uthread4, continuation

TIMEOUT = 10

try:
	_real_socket
except NameError:
	_real_socket = socket

def socket(protocol, type):
	sock = _real_socket(protocol, type)
	sock.setblocking(0)
	return _Socket(sock)

#socketmodule.socket = socket  # XXX!!

class _Socket:
	
	def __init__(self, sock):
		self.sock = sock
	
	def __getattr__(self, attr):
		# simply delegate what we don't override
		return getattr(self.sock, attr)
	
	def setblocking(self, value):
		raise error, (-1, "Can't set blocking mode for uthreaded socket")
	
	def accept(self):
		conn, addrinfo = _selectDaemon.doAction(TIMEOUT, 0, self.sock, self.sock.accept)
		return _Socket(sock=conn), addrinfo
	
	def connect(self, address):
		err = self.sock.connect_ex(address)
		if err and err <> errno.EINPROGRESS:
			raise error, (err, errno.errorcode[err])
	
	def send(self, data):
		return _selectDaemon.doAction(TIMEOUT, 1, self.sock, self.sock.send, data)
				
	def recv(self, buffersize):
		return _selectDaemon.doAction(TIMEOUT, 0, self.sock, self.sock.recv, buffersize)


class _SelectDaemon:
	
	def __init__(self):
		self.__thread = None
		self.__running = 0
		self.__readers = {}
		self.__writers = {}
		self.__results = {}
	
	def doAction(self, timeout, isWrite, sock, func, *args):
		# XXX timeout is ignored for now
		thread = uthread4.getCurrentThread()
		if isWrite:
			self.__writers[sock] = (thread, func, args)
		else:
			self.__readers[sock] = (thread, func, args)
		continuation.uthread_lock(1)
		
		self.__start()
		
		thread.block() # this also resets the uthread_lock
		result = self.__results[sock]
		del self.__results[sock]
		return result
	
	def __start(self):
		if self.__running:
			return
		if self.__thread is None:
			self.__thread = uthread4.Thread("SelectDaemon", self.__loop)
		self.__thread.start()
		self.__running = 1
	
	def __loop(self):
		while 1:
			readers = self.__readers.keys()
			writers = self.__writers.keys()
			if readers or writers:
				readers, writers, err = select.select(readers, writers, [], 0)
				if not (readers or writers):
					uthread4.switchContext()
				else:
					for readSocket in readers:
						if self.__doAction(readSocket, self.__readers[readSocket]):
							del self.__readers[readSocket]
					for writeSocket in writers:
						if self.__doAction(writeSocket, self.__writers[writeSocket]):
							del self.__writers[writeSocket]
			else:
				tmp = continuation.uthread_lock(1)
				if len(uthread4.getAllThreads()) <= 1:
					# we're the last active thread
					self.__thread = None
					raise SystemExit
				else:
					self.__running = 0
					self.__thread.block()
	
	def __doAction(self, sock, (thread, func, args)):
		try:
			rv = apply(func, args)
		except error, why:
			if why[0] == errno.EWOULDBLOCK:
				return 0
			else:
				# pass the exception on to the thread that needs it...
				thread.postException(sys.exc_info()[0], sys.exc_info()[1])
				thread.start()
				return 1
		else:
			self.__results[sock] = rv
			thread.start()
			return 1


_selectDaemon = _SelectDaemon()

if __name__ == "__main__":
	def client_test():
		s = socket(AF_INET, SOCK_STREAM)
		print "connecting..."
		s.connect(('www.python.org', 80))
		print "sending request..."
		print s.send('GET /\r\n\r\n')
		while 1:
			data = s.recv(256)
			if not data:
				print "done!"
				break
			print "incoming data:", len(data)
		s.close()
	
	def server_test():
		def handle_request(conn, addrinfo, serverthread):
			while 1:
				print "waiting for data..", addrinfo
				data = conn.recv(256)
				print "got some data:", `data`
				if not data or "\n" in data:
					print "done", addrinfo
					break
				if data[:4] == "quit":
					print "quitting server"
					serverthread.postException(SystemExit)
					break
			conn.close()
		
		serverthread = uthread4.getCurrentThread()
		
		s = socket(AF_INET, SOCK_STREAM)
		s.bind(("192.0.0.1", 999))
		s.listen(5)
		try:
			while 1:
				print "ready to accept requests."
				conn, addrinfo = s.accept()
				print "incoming request from:", addrinfo
				uthread4.new(handle_request, conn, addrinfo, serverthread)
		finally:
			print "closing server socket."
			s.close()
	
	uthread4.new(client_test)
	#uthread4.new(server_test)
	uthread4.run()







More information about the Python-list mailing list