[py-dev] distributed computing with py.execnet

Carl Friedrich Bolz cfbolz at gmx.de
Tue Jun 21 00:40:35 CEST 2005


Hi Py-Dev!

I experimented a bit with py.execnet today and just want to share the 
result of the experiment. I wrote a little proof of concept for 
distributing a big computational task between several computers (think 
cluster) without having to install anything on them.

There are several processes involved: a server that establishes 
connections via execnet gateways and sends them their task via the 
channels. Then the server listens at a socket. If a node has finished 
its task, it sends the server a notification via the socket and then the 
server fetches the result via the appropriate channel. Then the node 
gets a new task via the channel.

I used the search for Mersenne primes with the Lucas-Lehmer-test as the 
computational task (which one wouldn't implement in Python, of course). 
The whole implementation, server and primality test is less than 130 
lines of code (see below). There are still some things missing (like a 
clean shutdown for example) but the basic concept works very well.

execnet rocks!

Regards,

Carl Friedrich


#-----------server.py--------------

import py
import SocketServer

ip = "192.168.10.1" # ip of the server
nodes = ["cfbolz at linux", "jana at albert"]
port = 50007
chunk_size = 50
source = py.code.Source(py.path.local().join("task.py").read())

log = py.log.Producer("server")
py.log.setconsumer("server", None)

class TaskDistributor(object):
     def __init__(self, nodes, chunk_size=1):
         self.i = 1
         self.nodes = nodes
         self.chunk_size = chunk_size
         self.channels = []
         for node in nodes:
             print node
             channel = py.execnet.SshGateway(node).remote_exec(source)
             self.channels.append(channel)
             assert channel.receive() == "online"
             channel.send(len(self.channels) - 1)
             channel.send((ip, port))
             channel.send(range(self.i, self.i + self.chunk_size))
             channel.send(len(nodes) * 3)
             assert channel.receive() == "ok"
             log.init("sent task %s" % (self.i,))
             self.i += self.chunk_size

     def new_task(self, number):
         channel = self.channels[number]
         results = channel.receive()
         channel.send("ok")
         channel.send(range(self.i, self.i + self.chunk_size))
         log.new_job(self.i)
         self.i += self.chunk_size
         assert channel.receive() == "ok"
         for result in results:
             if result[1]:
                 print "%s = 2**%s - 1 is a prime! result from node %s" 
% (2 ** result[0] - 1, result[0], number)
         if not result[1]:
             print "2**%s - 1 is not prime" % (self.i, )

class RequestHandler(SocketServer.BaseRequestHandler):
     def handle(self):
         try:
             number = int(self.request.recv(10))
             log.request("from", number)
             self.request.send("ok")
             RequestHandler.distributor.new_task(number)
         except ValueError:
             self.request.send("not a number")
         log.request("closing socket")
         self.request.close()

if __name__ == '__main__':
     RequestHandler.distributor = TaskDistributor(nodes, 100)
     server = SocketServer.TCPServer((ip, port), RequestHandler)
     print server
     server.serve_forever()




#-----------task.py--------------

import operator
import random
import socket
import time
import sys

try:
     import psyco
     psyco.full()
except ImportError:
     pass

def fermattest(p, tests=30):
     if p == 1:
         return False
     for i in xrange(tests):
         a = long(random.random() * (p - 1) + 1)
         if not pow(a, p, p) == a:
             return False
     return True

def M(p):
         return 2 ** p - 1

def lucastest(p):
     curr = 4
     m = M(p)
     for i in range(p - 2):
         curr = (curr ** 2 - 2) % m
     if curr == 0:
         return True
     return False

def process_task(task_data):
     results = []
     for p in task_data:
         print p
         if not fermattest(p):
             results.append((p, False))
         else:
             results.append((p, lucastest(p)))
     return results

channel.send("online")
mynumber = str(channel.receive())
socketaddress = channel.receive()
task_data = channel.receive()
sleep = channel.receive()
channel.send("ok")
f = file("/tmp/out.txt", "w", 0)

print >> f, "sleeping"
time.sleep(sleep) # give the server time to establish all connections
print >> f, "starting to work"

while 1:
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     result = process_task(task_data)
     print >>f, result, socketaddress
     print >>f, "connecting", s
     try:
         s.connect(socketaddress)
     except Exception, e:
         print >>f, "error!", e
         f.close()
         raise
     print >>f, "connected2"
     s.send(mynumber)
     print >>f, "sent number"
     assert s.recv(2) == "ok"
     s.close()
     channel.send(result)
     print >>f, "sent result"
     assert channel.receive() == "ok"
     task_data = channel.receive()
     print >>f, "new task data %s" % task_data
     channel.send("ok")





More information about the Pytest-dev mailing list