thread, multiprocessing: communication overhead

mk mrkafk at gmail.com
Tue Dec 30 10:46:29 EST 2008


Hello everyone,

This time I decided to test communication overhead in multithreaded / 
multiprocess communication. The results are rather disappointing, that 
is, communication overhead seems to be very high. In each of the 
following functions, I send 10,000 numbers to the function / 10 threads 
/ 10 processes, which simply returns it in its respective way.


Function: notfun            Best: 0.00622 sec   Average: 0.00633 sec
(simple function)

Function: threadsemfun      Best: 0.64428 sec   Average: 0.64791 sec
(10 threads synchronizing using semaphore)

Function: threadlockfun     Best: 0.66288 sec   Average: 0.66453 sec
(10 threads synchronizing using locks)

Function: procqueuefun      Best: 1.16291 sec   Average: 1.17217 sec
(10 processes communicating with main process using queues)

Function: procpoolfun       Best: 1.18648 sec   Average: 1.19577 sec
(a pool of 10 processes)

If I'm doing smth wrong in the code below (smth that would result in 
performance suffering), please point it out.

Code:

import threading
import multiprocessing
import time
import timeit


def time_fun(fun):
         t = timeit.Timer(stmt = fun, setup = "from __main__ import " + 
fun.__name__)
         results = t.repeat(repeat=10, number=1)
         best_result = min(results)
         avg = sum(results) / len(results)
         print "Function: %-15s   Best: %5.5f sec   Average: %5.5f sec" 
% (fun.__name__, best_result, avg)


def notfun():
         inputlist = range(0,10000)
         reslist = []
         for x in range(len(inputlist)):
                 reslist.append(inputlist.pop())

def threadsemfun():
         def tcalc(sem, inputlist, reslist, tid, activitylist):
                 while len(inputlist) > 0:
                         sem.acquire()
                         try:
                                 x = inputlist.pop()
                         except IndexError:
                                 sem.release()
                                 return
                         #activitylist[tid] += 1
                         reslist.append(x)
                         sem.release()
         inputlist = range(0,10000)
         #print "before: ", sum(inputlist)
         reslist = []
         tlist = []
         activitylist = [ 0 for x in range(0,10) ]
         sem = threading.Semaphore()
         for t in range(0,10):
                 tlist.append(threading.Thread(target=tcalc, args=(sem, 
inputlist, reslist, t, activitylist)))
         for t in tlist:
                 t.start()
         for t in tlist:
                 t.join()
         #print "after: ", sum(reslist)
         #print "thread action count:", activitylist


def threadlockfun():
         def tcalc(lock, inputlist, reslist, tid, activitylist):
                 while True:
                         lock.acquire()
                         if len(inputlist) == 0:
                                 lock.release()
                                 return
                         x = inputlist.pop()
                         reslist.append(x)
                         #activitylist[tid] += 1
                         lock.release()
         inputlist = range(0,10000)
         #print "before: ", sum(inputlist)
         reslist = []
         tlist = []
         activitylist = [ 0 for x in range(0,10) ]
         sem = threading.Semaphore()
         for t in range(0,10):
                 tlist.append(threading.Thread(target=tcalc, args=(sem, 
inputlist, reslist, t, activitylist)))
         for t in tlist:
                 t.start()
         for t in tlist:
                 t.join()
         #print "after: ", sum(reslist)
         #print "thread action count:", activitylist

def pf(x):
         return x

def procpoolfun():
         pool = multiprocessing.Pool(processes=10)
         inputlist = range(0,10000)
         reslist = []
         i, j, jmax = 0, 10, len(inputlist)
         #print "before: ", sum(inputlist)
         while j <= jmax:
                 res = pool.map_async(pf, inputlist[i:j])
                 reslist.extend(res.get())
                 i += 10
                 j += 10
         #print "after: ", sum(reslist)

def procqueuefun():
         def pqf(qin, qout):
                 pid = multiprocessing.current_process().pid
                 while True:
                         x = qin.get()
                         if x == 'STOP':
                                 return
                         qout.put((pid, x))
         qin = multiprocessing.Queue()
         qout = multiprocessing.Queue()
         plist = []
         activity = dict()
         for i in range(0,10):
                 p = multiprocessing.Process(target = pqf, args=(qin, qout))
                 p.start()
                 plist.append(p)
                 activity[p.pid] = 0
         inputlist = range(0,10000)
         reslist = []
         #print "before:", sum(inputlist)
         ilen = len(inputlist)
         x = 0
         while x != ilen:
                 for i in range(0,10):
                         qin.put(inputlist[x+i])
                 for i in range(0,10):
                         pid, res = qout.get()
                         #activity[pid] = activity[pid] + 1
                         reslist.append(res)
                 x += 10
         for i in range(0,10):
                 qin.put('STOP')
         for i in range(len(plist)):
                 plist[i].join()

         #print "after:", sum(reslist)
         #print "activity", activity

if __name__ == "__main__":
         time_fun(notfun)
         time_fun(threadsemfun)
         time_fun(threadlockfun)
         time_fun(procqueuefun)
         time_fun(procpoolfun)






More information about the Python-list mailing list