Micro-Threads: uthreadsleep.py -- scheduling of uthreads to c ontinue processing after given seconds
Just van Rossum
just at letterror.com
Wed May 24 03:28:39 EDT 2000
At 12:28 AM -0400 24-05-2000, Mike Fletcher wrote:
>My guess would be there will number of sub-classes of this general pattern.
>These are currently represented by:
> time.sleep() -- important
> select.select() -- extremely important for network applications
It appears to me that select.select() is only relevant in non-threaded
programs (do you have an example of its use in a threaded
environment?).
I have been playing with a uthreaded socket wrapper that uses
select under the hood. I've pasted it below for your amusement.
It's in a very rough state, so don't expect too much from it...
Oh, for those of you interested in the latest uthread module, I've
put a copy here:
http://www.petr.nl/just/uthread4.py
Just
# usocket.py
import time
import sys
import select
import errno
import socket
socketmodule = socket # XXX!!!
from socket import *
import uthread4, continuation
TIMEOUT = 10
try:
_real_socket
except NameError:
_real_socket = socket
def socket(protocol, type):
sock = _real_socket(protocol, type)
sock.setblocking(0)
return _Socket(sock)
#socketmodule.socket = socket # XXX!!
class _Socket:
def __init__(self, sock):
self.sock = sock
def __getattr__(self, attr):
# simply delegate what we don't override
return getattr(self.sock, attr)
def setblocking(self, value):
raise error, (-1, "Can't set blocking mode for uthreaded socket")
def accept(self):
conn, addrinfo = _selectDaemon.doAction(TIMEOUT, 0, self.sock, self.sock.accept)
return _Socket(sock=conn), addrinfo
def connect(self, address):
err = self.sock.connect_ex(address)
if err and err <> errno.EINPROGRESS:
raise error, (err, errno.errorcode[err])
def send(self, data):
return _selectDaemon.doAction(TIMEOUT, 1, self.sock, self.sock.send, data)
def recv(self, buffersize):
return _selectDaemon.doAction(TIMEOUT, 0, self.sock, self.sock.recv, buffersize)
class _SelectDaemon:
def __init__(self):
self.__thread = None
self.__running = 0
self.__readers = {}
self.__writers = {}
self.__results = {}
def doAction(self, timeout, isWrite, sock, func, *args):
# XXX timeout is ignored for now
thread = uthread4.getCurrentThread()
if isWrite:
self.__writers[sock] = (thread, func, args)
else:
self.__readers[sock] = (thread, func, args)
continuation.uthread_lock(1)
self.__start()
thread.block() # this also resets the uthread_lock
result = self.__results[sock]
del self.__results[sock]
return result
def __start(self):
if self.__running:
return
if self.__thread is None:
self.__thread = uthread4.Thread("SelectDaemon", self.__loop)
self.__thread.start()
self.__running = 1
def __loop(self):
while 1:
readers = self.__readers.keys()
writers = self.__writers.keys()
if readers or writers:
readers, writers, err = select.select(readers, writers, [], 0)
if not (readers or writers):
uthread4.switchContext()
else:
for readSocket in readers:
if self.__doAction(readSocket, self.__readers[readSocket]):
del self.__readers[readSocket]
for writeSocket in writers:
if self.__doAction(writeSocket, self.__writers[writeSocket]):
del self.__writers[writeSocket]
else:
tmp = continuation.uthread_lock(1)
if len(uthread4.getAllThreads()) <= 1:
# we're the last active thread
self.__thread = None
raise SystemExit
else:
self.__running = 0
self.__thread.block()
def __doAction(self, sock, (thread, func, args)):
try:
rv = apply(func, args)
except error, why:
if why[0] == errno.EWOULDBLOCK:
return 0
else:
# pass the exception on to the thread that needs it...
thread.postException(sys.exc_info()[0], sys.exc_info()[1])
thread.start()
return 1
else:
self.__results[sock] = rv
thread.start()
return 1
_selectDaemon = _SelectDaemon()
if __name__ == "__main__":
def client_test():
s = socket(AF_INET, SOCK_STREAM)
print "connecting..."
s.connect(('www.python.org', 80))
print "sending request..."
print s.send('GET /\r\n\r\n')
while 1:
data = s.recv(256)
if not data:
print "done!"
break
print "incoming data:", len(data)
s.close()
def server_test():
def handle_request(conn, addrinfo, serverthread):
while 1:
print "waiting for data..", addrinfo
data = conn.recv(256)
print "got some data:", `data`
if not data or "\n" in data:
print "done", addrinfo
break
if data[:4] == "quit":
print "quitting server"
serverthread.postException(SystemExit)
break
conn.close()
serverthread = uthread4.getCurrentThread()
s = socket(AF_INET, SOCK_STREAM)
s.bind(("192.0.0.1", 999))
s.listen(5)
try:
while 1:
print "ready to accept requests."
conn, addrinfo = s.accept()
print "incoming request from:", addrinfo
uthread4.new(handle_request, conn, addrinfo, serverthread)
finally:
print "closing server socket."
s.close()
uthread4.new(client_test)
#uthread4.new(server_test)
uthread4.run()
More information about the Python-list
mailing list