[ANNOUNCE] txObject ATK 1.3 for Python
Thomas Hazel
tmhazel at mediaone.net
Sun Jan 14 20:14:15 EST 2001
Python Folks,
I have been a long user & lover of Python since late 1992. Over the years I
have created a toolkit for developing platform-independent C++ software. I
have also created some useful Python interfaces to these tools (Timers, IO
notification & processing, Threads/Events/Lcoks (home grown & native), and
TCP Server etc..).
The end of this email has few Python test programs demonstrating some of the
Python Modules and tweak your interest.
Take a look at http://txobject.sourceforge.net
See READMEs in tar balls to build txObject ATK 1.3 and the txPyObject ATK
1.3 Python Modules.
Enjoy and Happy Python Programming
Tom
############################################################################
###
##
## Threads, Events, Locks
##
############################################################################
###
import txLock # locks are not used in this test, but feel free to use them
import txEvent
import txThread
EXIT_FLAG = 0
EXIT_EVENT = txEvent.txEvent("EXIT")
def func(*args):
print 'args from func call : ', args
while not EXIT_FLAG:
print 'looping'
txThread.yield(EXIT_EVENT, 1000)
return
print 'Starting thread test'
txThread.start(func, ("abc", 123))
txThread.yield(10000) # wait ten seconds
EXIT_FLAG = 1
txEvent.trigger(EXIT_EVENT)
############################################################################
###
##
## Timers
##
############################################################################
###
import txTimer
import txEvent
import txThread
import txExtendedTimer # this is not used here, but feel free to use it
COUNT = 0
EXIT_EVENT = txEvent.txEvent("EXIT")
def callback (*args):
global COUNT
print 'timer fired : ', args
if COUNT < 10:
COUNT = COUNT + 1
return txTimer.CONTINUE
else:
txEvent.trigger(EXIT_EVENT)
return txTimer.STOP
timer = txTimer.txTimer(callback, ("abc", 123), 1000) # one second timer
txThread.yield(EXIT_EVENT)
############################################################################
###
##
## IO notification
##
############################################################################
###
import sys
import txSync
import txEvent
import txThread
COUNT = 0
EXIT_EVENT = txEvent.txEvent("EXIT")
def callback (*args):
global COUNT
read = raw_input("") # get data so to reset stdin fd state
print 'io fired : ', args, read
if COUNT < 10:
COUNT = COUNT + 1
else:
txEvent.trigger(EXIT_EVENT)
return
id = txSync.registerIO(callback,("abc",
123),sys.stdin.fileno(),txSync.IORead)
print '\nEntire something in the console window and then press turn...
repeat'
txThread.yield(EXIT_EVENT)
############################################################################
###
##
## TCP Server
##
############################################################################
###
import sys
import signal
import txEvent
import txThread
import txTCPServer
if len(sys.argv) != 5:
print 'usage : local_machine local_port remote_machine remote_port'
sys.exit(1)
local_machine = sys.argv[1]
local_port = eval(sys.argv[2])
remote_machine = sys.argv[3]
remote_port = eval(sys.argv[4])
EXIT_FLAG = 0
EXIT_EVENT = txEvent.txEvent("EXIT")
TCP_SERVER = txTCPServer.txTCPServer((local_machine, local_port))
def shutdown(*args):
global EXIT_FLAG
global EXIT_EVENT
print 'Control-C'
EXIT_FLAG = 1
txEvent.trigger(EXIT_EVENT)
return
def recvFrom(*args):
while not EXIT_FLAG:
messages = txTCPServer.recvFrom(TCP_SERVER) # get as many as
possible
# num_of_mgs = 3 # block until 3 msgs have been received
# messages = txTCPServer.recvFrom(TCP_SERVER, num_of_msgs)
print 'Messages : ', messages
return
def contactEstablish(*args):
global EXIT_FLAG
global EXIT_EVENT
remote_machine, remote_port = args[0]
print 'PEER UP : ', remote_machine, remote_port
for i in range(1, 100):
#
# also see TCP SERVER multicast & broadcast
#
txTCPServer.sendTo( \
TCP_SERVER, 'HELLO from ' + `local_port`, \
(remote_machine, remote_port))
txThread.yield(1000)
EXIT_FLAG = 1
txEvent.trigger(EXIT_EVENT)
return
def contactLost(*args):
global EXIT_FLAG
global EXIT_EVENT
remote_machine, remote_port = args[0]
print 'PEER DOWN : ', remote_machine, remote_port
return
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
txTCPServer.registerContactEstablished(TCP_SERVER, contactEstablish)
txTCPServer.registerContactLost(TCP_SERVER, contactLost)
txThread.start(recvFrom) # start func on a thread to receive messages
txTCPServer.contact(TCP_SERVER, (remote_machine, remote_port))
while not EXIT_FLAG:
# just a dummy loop for debugging. you can also just wait on an event
txThread.yield(1000)
More information about the Python-list
mailing list