[SciPy-user] shared memory machines
Gael Varoquaux
gael.varoquaux at normalesup.org
Sun Feb 1 10:29:40 EST 2009
On Sun, Feb 01, 2009 at 10:03:30AM -0500, Gideon Simpson wrote:
> Yes, but I'm talking about when you have a multiprocessor/multicore
> system, not a commodity cluster. In these shared memory
> configurations, were I using compiled code, I'd be able to use OpenMP
> to take advantage of the additional cores/processors. I'm wondering
> if anyone has looked at ways to take advantage of such configurations
> with scipy.
I use the multiprocessing module:
http://docs.python.org/library/multiprocessing.html
I also have some code to share arrays between processes. I'd love to
submit it for integration with numpy, but first I'd like it to get more
exposure so that the eventual flaws in the APIs are found. I am attaching
it.
Actually I wrote this code a few months ago, and now that I am looking at
it, I realise that the SharedMemArray should probably be a subclass of
numpy.ndarray, and implement the full array signature. I am not sure if
this is possible or not (ie if it will still be easy to have
multiprocessing share the data between processes or not). I don't really
have time for polishing this right, anybody wants to have a go?
Gaël
> On Feb 1, 2009, at 4:57 AM, Gael Varoquaux wrote:
> > On Sun, Feb 01, 2009 at 12:37:48AM -0500, Gideon Simpson wrote:
> >> Has anyone been able to take advantage of shared memory machines with
> >> scipy? How did you do it?
> > I am not sure I understand your question. You want to do parallel
> > computing and share the arrays between processes, is that it?
-------------- next part --------------
"""
Small helper module to share arrays between processes without copying
data.
Numpy arrays can be converted to shared memory arrays, which implement
the array protocole, but are allocated in memory that can be
share transparently by the multiprocessing module.
"""
# Author: Gael Varoquaux <gael dot varoquaux at normalesup dot org>
# Copyright: Gael Varoquaux
# License: BSD
import numpy as np
import multiprocessing
import ctypes
_ctypes_to_numpy = {
ctypes.c_char : np.int8,
ctypes.c_wchar : np.int16,
ctypes.c_byte : np.int8,
ctypes.c_ubyte : np.uint8,
ctypes.c_short : np.int16,
ctypes.c_ushort : np.uint16,
ctypes.c_int : np.int32,
ctypes.c_uint : np.int32,
ctypes.c_long : np.int32,
ctypes.c_ulong : np.int32,
ctypes.c_float : np.float32,
ctypes.c_double : np.float64
}
_numpy_to_ctypes = dict((value, key) for key, value in
_ctypes_to_numpy.iteritems())
def shmem_as_ndarray(data, dtype=float):
""" Given a multiprocessing.Array object, as created by
ndarray_to_shmem, returns an ndarray view on the data.
"""
dtype = np.dtype(dtype)
size = data._wrapper.get_size()/dtype.itemsize
arr = np.frombuffer(buffer=data, dtype=dtype, count=size)
return arr
def ndarray_to_shmem(arr):
""" Converts a numpy.ndarray to a multiprocessing.Array object.
The memory is copied, and the array is flattened.
"""
arr = arr.reshape((-1, ))
data = multiprocessing.RawArray(_numpy_to_ctypes[arr.dtype.type],
arr.size)
ctypes.memmove(data, arr.data[:], len(arr.data))
return data
def test_ndarray_conversion():
""" Check that the conversion to multiprocessing.Array and back works.
"""
a = np.random.random((100, ))
a_sh = ndarray_to_shmem(a)
b = shmem_as_ndarray(a_sh)
np.testing.assert_almost_equal(a, b)
def test_conversion_non_flat():
""" Check that the conversion also works with non-flat arrays.
"""
a = np.random.random((100, 2))
a_flat = a.flatten()
a_sh = ndarray_to_shmem(a)
b = shmem_as_ndarray(a_sh)
np.testing.assert_almost_equal(a_flat, b)
def test_conversion_non_contiguous():
""" Check that the conversion also works with non-contiguous arrays.
"""
a = np.indices((3, 3, 3))
a = a.T
a_flat = a.flatten()
a_sh = ndarray_to_shmem(a)
b = shmem_as_ndarray(a_sh, dtype=a.dtype)
np.testing.assert_almost_equal(a_flat, b)
def test_no_copy():
""" Check that the data is not copied from the multiprocessing.Array.
"""
a = np.random.random((100, ))
a_sh = ndarray_to_shmem(a)
a = shmem_as_ndarray(a_sh)
b = shmem_as_ndarray(a_sh)
a[0] = 1
np.testing.assert_equal(a[0], b[0])
a[0] = 0
np.testing.assert_equal(a[0], b[0])
################################################################################
# A class to carry around the relevant information
################################################################################
class SharedMemArray(object):
""" Wrapper around multiprocessing.Array to share an array accross
processes.
"""
def __init__(self, arr):
""" Initialize a shared array from a numpy array.
The data is copied.
"""
self.data = ndarray_to_shmem(arr)
self.dtype = arr.dtype
self.shape = arr.shape
def __array__(self):
""" Implement the array protocole.
"""
arr = shmem_as_ndarray(self.data, dtype=self.dtype)
arr.shape = self.shape
return arr
def asarray(self):
return self.__array__()
def test_sharing_array():
""" Check that a SharedMemArray shared between processes is indeed
modified in place.
"""
# Our worker function
def f(arr):
a = arr.asarray()
a *= -1
a = np.random.random((10, 3, 1))
arr = SharedMemArray(a)
# b is a copy of a
b = arr.asarray()
np.testing.assert_array_equal(a, b)
multiprocessing.Process(target=f, args=(arr,)).run()
np.testing.assert_equal(-b, a)
if __name__ == '__main__':
import nose
nose.runmodule()
More information about the SciPy-User
mailing list