[ANNOUNCE] txObject ATK 1.3 for Python

Thomas Hazel tmhazel at mediaone.net
Sun Jan 14 20:14:15 EST 2001


Python Folks,

I have been a long user & lover of Python since late 1992. Over the years I
have created a toolkit for developing platform-independent C++ software. I
have also created some useful Python interfaces to these tools (Timers, IO
notification & processing, Threads/Events/Lcoks (home grown & native), and
TCP Server etc..).

The end of this email has few Python test programs demonstrating some of the
Python Modules and tweak your interest.

Take a look at http://txobject.sourceforge.net

See READMEs in tar balls to build txObject ATK 1.3 and the txPyObject ATK
1.3 Python Modules.

Enjoy and Happy Python Programming

Tom

############################################################################
###
##
## Threads, Events, Locks
##
############################################################################
###

import txLock  # locks are not used in this test, but feel free to use them
import txEvent
import txThread

EXIT_FLAG = 0
EXIT_EVENT = txEvent.txEvent("EXIT")

def func(*args):
    print 'args from func call : ', args

    while not EXIT_FLAG:
        print 'looping'
        txThread.yield(EXIT_EVENT, 1000)

    return

print 'Starting thread test'
txThread.start(func, ("abc", 123))

txThread.yield(10000) # wait ten seconds

EXIT_FLAG = 1

txEvent.trigger(EXIT_EVENT)

############################################################################
###
##
## Timers
##
############################################################################
###

import txTimer
import txEvent
import txThread
import txExtendedTimer # this is not used here, but feel free to use it

COUNT = 0

EXIT_EVENT = txEvent.txEvent("EXIT")

def callback (*args):
    global COUNT
    print 'timer fired : ', args

    if COUNT < 10:
        COUNT = COUNT + 1
        return txTimer.CONTINUE
    else:
        txEvent.trigger(EXIT_EVENT)

    return txTimer.STOP

timer = txTimer.txTimer(callback, ("abc", 123), 1000) # one second timer

txThread.yield(EXIT_EVENT)

############################################################################
###
##
## IO notification
##
############################################################################
###

import sys
import txSync
import txEvent
import txThread

COUNT = 0

EXIT_EVENT = txEvent.txEvent("EXIT")

def callback (*args):
    global COUNT

    read = raw_input("") # get data so to reset stdin fd state

    print 'io fired : ', args, read

    if COUNT < 10:
        COUNT = COUNT + 1
    else:
        txEvent.trigger(EXIT_EVENT)

    return

id = txSync.registerIO(callback,("abc",
123),sys.stdin.fileno(),txSync.IORead)

print '\nEntire something in the console window and then press turn...
repeat'

txThread.yield(EXIT_EVENT)

############################################################################
###
##
## TCP Server
##
############################################################################
###

import sys
import signal
import txEvent
import txThread
import txTCPServer

if len(sys.argv) != 5:
    print 'usage : local_machine local_port remote_machine remote_port'
    sys.exit(1)

local_machine = sys.argv[1]
local_port = eval(sys.argv[2])
remote_machine = sys.argv[3]
remote_port = eval(sys.argv[4])

EXIT_FLAG = 0
EXIT_EVENT = txEvent.txEvent("EXIT")

TCP_SERVER = txTCPServer.txTCPServer((local_machine, local_port))

def shutdown(*args):
    global EXIT_FLAG
    global EXIT_EVENT

    print 'Control-C'

    EXIT_FLAG = 1
    txEvent.trigger(EXIT_EVENT)
    return

def recvFrom(*args):
    while not EXIT_FLAG:
        messages = txTCPServer.recvFrom(TCP_SERVER) # get as many as
possible

        # num_of_mgs = 3 # block until 3 msgs have been received
        # messages = txTCPServer.recvFrom(TCP_SERVER, num_of_msgs)

        print 'Messages : ', messages
    return


def contactEstablish(*args):
    global EXIT_FLAG
    global EXIT_EVENT

    remote_machine, remote_port = args[0]

    print 'PEER UP : ', remote_machine, remote_port

    for i in range(1, 100):
        #
        # also see TCP SERVER multicast & broadcast
        #

        txTCPServer.sendTo( \
            TCP_SERVER, 'HELLO from ' + `local_port`, \
            (remote_machine, remote_port))

        txThread.yield(1000)

    EXIT_FLAG = 1
    txEvent.trigger(EXIT_EVENT)
    return

def contactLost(*args):
    global EXIT_FLAG
    global EXIT_EVENT

    remote_machine, remote_port = args[0]

    print 'PEER DOWN : ', remote_machine, remote_port
    return

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

txTCPServer.registerContactEstablished(TCP_SERVER, contactEstablish)
txTCPServer.registerContactLost(TCP_SERVER, contactLost)

txThread.start(recvFrom) # start func on a thread to receive messages

txTCPServer.contact(TCP_SERVER, (remote_machine, remote_port))

while not EXIT_FLAG:
    # just a dummy loop for debugging. you can also just wait on an event
    txThread.yield(1000)





More information about the Python-list mailing list