[py-dev] distributed computing with py.execnet
Carl Friedrich Bolz
cfbolz at gmx.de
Tue Jun 21 00:40:35 CEST 2005
Hi Py-Dev!
I experimented a bit with py.execnet today and just want to share the
result of the experiment. I wrote a little proof of concept for
distributing a big computational task between several computers (think
cluster) without having to install anything on them.
There are several processes involved: a server that establishes
connections via execnet gateways and sends them their task via the
channels. Then the server listens at a socket. If a node has finished
its task, it sends the server a notification via the socket and then the
server fetches the result via the appropriate channel. Then the node
gets a new task via the channel.
I used the search for Mersenne primes with the Lucas-Lehmer-test as the
computational task (which one wouldn't implement in Python, of course).
The whole implementation, server and primality test is less than 130
lines of code (see below). There are still some things missing (like a
clean shutdown for example) but the basic concept works very well.
execnet rocks!
Regards,
Carl Friedrich
#-----------server.py--------------
import py
import SocketServer
ip = "192.168.10.1" # ip of the server
nodes = ["cfbolz at linux", "jana at albert"]
port = 50007
chunk_size = 50
source = py.code.Source(py.path.local().join("task.py").read())
log = py.log.Producer("server")
py.log.setconsumer("server", None)
class TaskDistributor(object):
def __init__(self, nodes, chunk_size=1):
self.i = 1
self.nodes = nodes
self.chunk_size = chunk_size
self.channels = []
for node in nodes:
print node
channel = py.execnet.SshGateway(node).remote_exec(source)
self.channels.append(channel)
assert channel.receive() == "online"
channel.send(len(self.channels) - 1)
channel.send((ip, port))
channel.send(range(self.i, self.i + self.chunk_size))
channel.send(len(nodes) * 3)
assert channel.receive() == "ok"
log.init("sent task %s" % (self.i,))
self.i += self.chunk_size
def new_task(self, number):
channel = self.channels[number]
results = channel.receive()
channel.send("ok")
channel.send(range(self.i, self.i + self.chunk_size))
log.new_job(self.i)
self.i += self.chunk_size
assert channel.receive() == "ok"
for result in results:
if result[1]:
print "%s = 2**%s - 1 is a prime! result from node %s"
% (2 ** result[0] - 1, result[0], number)
if not result[1]:
print "2**%s - 1 is not prime" % (self.i, )
class RequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
try:
number = int(self.request.recv(10))
log.request("from", number)
self.request.send("ok")
RequestHandler.distributor.new_task(number)
except ValueError:
self.request.send("not a number")
log.request("closing socket")
self.request.close()
if __name__ == '__main__':
RequestHandler.distributor = TaskDistributor(nodes, 100)
server = SocketServer.TCPServer((ip, port), RequestHandler)
print server
server.serve_forever()
#-----------task.py--------------
import operator
import random
import socket
import time
import sys
try:
import psyco
psyco.full()
except ImportError:
pass
def fermattest(p, tests=30):
if p == 1:
return False
for i in xrange(tests):
a = long(random.random() * (p - 1) + 1)
if not pow(a, p, p) == a:
return False
return True
def M(p):
return 2 ** p - 1
def lucastest(p):
curr = 4
m = M(p)
for i in range(p - 2):
curr = (curr ** 2 - 2) % m
if curr == 0:
return True
return False
def process_task(task_data):
results = []
for p in task_data:
print p
if not fermattest(p):
results.append((p, False))
else:
results.append((p, lucastest(p)))
return results
channel.send("online")
mynumber = str(channel.receive())
socketaddress = channel.receive()
task_data = channel.receive()
sleep = channel.receive()
channel.send("ok")
f = file("/tmp/out.txt", "w", 0)
print >> f, "sleeping"
time.sleep(sleep) # give the server time to establish all connections
print >> f, "starting to work"
while 1:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = process_task(task_data)
print >>f, result, socketaddress
print >>f, "connecting", s
try:
s.connect(socketaddress)
except Exception, e:
print >>f, "error!", e
f.close()
raise
print >>f, "connected2"
s.send(mynumber)
print >>f, "sent number"
assert s.recv(2) == "ok"
s.close()
channel.send(result)
print >>f, "sent result"
assert channel.receive() == "ok"
task_data = channel.receive()
print >>f, "new task data %s" % task_data
channel.send("ok")
More information about the Pytest-dev
mailing list