[pypy-svn] r77216 - in pypy/branch/fast-forward/pypy/module/_multiprocessing: . test

afa at codespeak.net afa at codespeak.net
Mon Sep 20 19:56:30 CEST 2010


Author: afa
Date: Mon Sep 20 19:56:28 2010
New Revision: 77216

Added:
   pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
Modified:
   pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
   pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
   pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py
Log:
Start implementing _multiprocessing connection objects.
Does not work so far.


Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/__init__.py	Mon Sep 20 19:56:28 2010
@@ -5,6 +5,7 @@
 
     interpleveldefs = {
         'Connection'      : 'interp_connection.W_SocketConnection',
+        'PipeConnection'  : 'interp_connection.W_PipeConnection',
     }
 
     appleveldefs = {

Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_connection.py	Mon Sep 20 19:56:28 2010
@@ -1,10 +1,14 @@
-from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.baseobjspace import ObjSpace, Wrappable, W_Root
 from pypy.interpreter.typedef import TypeDef, GetSetProperty
+from pypy.interpreter.gateway import interp2app, unwrap_spec
+import sys
 
 INVALID_HANDLE_VALUE = -1
 READABLE = 1
 WRITABLE = 2
 
+PY_SSIZE_T_MAX = sys.maxint
+PY_SSIZE_T_MIN = -sys.maxint - 1
 
 class W_BaseConnection(Wrappable):
     def __init__(self, handle, flags):
@@ -32,12 +36,110 @@
     def writable_get(space, self):
         return space.w_bool(self.flags & WRITABLE)
 
-class W_SocketConnection(W_BaseConnection):
-    pass
+    @unwrap_spec('self', ObjSpace, 'bufferstr', 'index', 'index')
+    def send_bytes(self, space, buffer, offset=0, size=PY_SSIZE_T_MIN):
+        length = len(buffer)
+        self._check_writable()
+        if offset < 0:
+            raise OperationError(space.w_ValueError,
+                                 space.wrap("offset is negative"))
+        if length < offset:
+            raise OperationError(space.w_ValueError,
+                                 space.wrap("buffer length < offset"))
+
+        if size == PY_SSIZE_T_MIN:
+            size = length - offset
+        elif size < 0:
+            raise OperationError(space.w_ValueError,
+                                 space.wrap("size is negative"))
+        elif offset + size > length:
+            raise OperationError(space.w_ValueError,
+                                 space.wrap("buffer length > offset + size"))
+
+        res = self.do_send_string(buffer, offset, size)
+        if res < 0:
+            raise mp_error(res)
+
+    @unwrap_spec('self', ObjSpace, 'index')
+    def recv_bytes(self, space, maxlength=sys.maxint):
+        self._check_readable()
+        if maxlength < 0:
+            raise OperationError(space.w_ValueError,
+                                 space.wrap("maxlength < 0"))
+
+        try:
+            res, newbuf = self.do_recv_string(maxlength)
+
+            if res < 0:
+                if res == MP_BAD_MESSAGE_LENGTH:
+                    self.flags &= ~READABLE
+                    if self.flags == 0:
+                        self.close()
+                raise mp_error(res)
+
+            if newbuf is not None:
+                return space.wrap(rffi.charp2str(newbuf, res))
+            else:
+                return space.wrap(rffi.charp2str(self.buffer, res))
+            return result
+        finally:
+            if newbuf is not None:
+                rffi.free_charp(newbuf)
+
+    @unwrap_spec('self', ObjSpace, W_Root, 'index')
+    def recv_bytes_into(self, space, w_buffer, offset=0):
+        rwbuffer = space.rwbuffer_w(w_buffer)
+        length = rwbuffer.getlength()
+
+        try:
+            res, newbuf = self.do_recv_string(length - offset)
+
+            if res < 0:
+                if res == MP_BAD_MESSAGE_LENGTH:
+                    self.flags &= ~READABLE
+                    if self.flags == 0:
+                        self.close()
+                raise mp_error(res)
+
+            if res > length - offset:
+                raise OperationError(BufferTooShort)
+            if newbuf is not None:
+                rwbuffer.setslice(offset, newbuf)
+            else:
+                rwbuffer.setslice(offset, self.buffer)
+        finally:
+            if newbuf is not None:
+                rffi.free_charp(newbuf)
 
-W_SocketConnection.typedef = TypeDef(
-    'Connection',
+        return space.wrap(res)
+
+
+base_typedef = TypeDef(
+    'BaseConnection',
     closed = GetSetProperty(W_BaseConnection.closed_get),
     readable = GetSetProperty(W_BaseConnection.readable_get),
     writable = GetSetProperty(W_BaseConnection.writable_get),
+
+    send_bytes = interp2app(W_BaseConnection.send_bytes),
+    recv_bytes = interp2app(W_BaseConnection.recv_bytes),
+    recv_bytes_into = interp2app(W_BaseConnection.recv_bytes_into),
+    ## send = interp2app(W_BaseConnection.send),
+    ## recv = interp2app(W_BaseConnection.recv),
+    ## poll = interp2app(W_BaseConnection.poll),
+    ## fileno = interp2app(W_BaseConnection.fileno),
+    ## close = interp2app(W_BaseConnection.close),
+    )
+
+class W_SocketConnection(W_BaseConnection):
+    pass
+
+W_SocketConnection.typedef = TypeDef(
+    'Connection', base_typedef
+)
+
+class W_PipeConnection(W_BaseConnection):
+    pass
+
+W_PipeConnection.typedef = TypeDef(
+    'PipeConnection', base_typedef
 )

Modified: pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py
==============================================================================
--- pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py	(original)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/interp_win32.py	Mon Sep 20 19:56:28 2010
@@ -58,6 +58,9 @@
         rwin32.DWORD, rwin32.DWORD, rwin32.HANDLE],
     rwin32.HANDLE)
 
+_ExitProcess = rwin32.winexternal(
+    'ExitProcess', [rffi.UINT], lltype.Void)
+
 def CloseHandle(space, w_handle):
     handle = handle_w(space, w_handle)
     if not rwin32.CloseHandle(handle):
@@ -129,6 +132,10 @@
 
     return w_handle(space, handle)
 
+ at unwrap_spec(ObjSpace, r_uint)
+def ExitProcess(space, code):
+    _ExitProcess(code)
+
 def win32_namespace(space):
     "NOT_RPYTHON"
     w_win32 = space.call_function(space.w_type,
@@ -148,6 +155,7 @@
     for name in ['CloseHandle', 'GetLastError', 'CreateFile',
                  'CreateNamedPipe', 'ConnectNamedPipe',
                  'SetNamedPipeHandleState',
+                 'ExitProcess',
                  ]:
         function = globals()[name]
         w_function = space.wrap(interp2app(function))

Added: pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py
==============================================================================
--- (empty file)
+++ pypy/branch/fast-forward/pypy/module/_multiprocessing/test/test_connection.py	Mon Sep 20 19:56:28 2010
@@ -0,0 +1,31 @@
+import py
+import sys
+from pypy.conftest import gettestobjspace
+
+class TestConnection:
+    def test_simple(self):
+        from pypy.module._multiprocessing import interp_connection
+
+class AppTestConnection:
+    def setup_class(cls):
+        if sys.platform != "win32":
+            py.test.skip("win32 only")
+        cls.space = gettestobjspace(usemodules=('thread', '_multiprocessing',
+                                                #'_rawffi', # on win32
+                                                ))
+        if sys.platform == "win32":
+            space = cls.space
+            space.setitem(space.sys.get('modules'),
+                          space.wrap('msvcrt'), space.sys)
+            space.setitem(space.sys.get('modules'),
+                          space.wrap('_subprocess'), space.sys)
+
+    def test_pipe_connection(self):
+        import multiprocessing
+        obj = [1, 2.0, "hello"]
+        whandle, rhandle = multiprocessing.Pipe()
+        whandle.send(obj)
+        obj2 = rhandle.recv()
+        assert obj == obj2
+
+



More information about the Pypy-commit mailing list