[pypy-commit] pypy default: readd _minimal_curses

fijal noreply at buildbot.pypy.org
Wed Apr 3 11:10:02 CEST 2013


Author: Maciej Fijalkowski <fijall at gmail.com>
Branch: 
Changeset: r62959:3dcd89310152
Date: 2013-04-03 11:08 +0200
http://bitbucket.org/pypy/pypy/changeset/3dcd89310152/

Log:	readd _minimal_curses

diff --git a/pypy/config/pypyoption.py b/pypy/config/pypyoption.py
--- a/pypy/config/pypyoption.py
+++ b/pypy/config/pypyoption.py
@@ -31,7 +31,7 @@
     ["_socket", "unicodedata", "mmap", "fcntl", "_locale", "pwd",
      "rctime" , "select", "zipimport", "_lsprof",
      "crypt", "signal", "_rawffi", "termios", "zlib", "bz2",
-     "struct", "_hashlib", "_md5", "_sha", "cStringIO",
+     "struct", "_hashlib", "_md5", "_sha", "_minimal_curses", "cStringIO",
      "thread", "itertools", "pyexpat", "_ssl", "cpyext", "array",
      "binascii", "_multiprocessing", '_warnings',
      "_collections", "_multibytecodec", "micronumpy", "_ffi",
@@ -45,7 +45,7 @@
      "binascii",
      # the following are needed for pyrepl (and hence for the
      # interactive prompt/pdb)
-     "termios",
+     "termios", "_minimal_curses",
      ]))
 
 working_oo_modules = default_modules.copy()
@@ -62,6 +62,7 @@
     del working_modules["fcntl"]
     del working_modules["pwd"]
     del working_modules["termios"]
+    del working_modules["_minimal_curses"]
 
     # The _locale module is needed by site.py on Windows
     default_modules["_locale"] = None
@@ -71,6 +72,7 @@
     del working_modules['rctime'] # depend on ctypes, missing tm_zone/tm_gmtoff
     del working_modules['signal'] # depend on ctypes, can't get at c-level 'errono'
     del working_modules['fcntl']  # LOCK_NB not defined
+    del working_modules["_minimal_curses"]
     del working_modules["termios"]
     del working_modules["_multiprocessing"]   # depends on rctime
 
@@ -101,6 +103,7 @@
     "pyexpat"   : ["pypy.module.pyexpat.interp_pyexpat"],
     "_ssl"      : ["pypy.module._ssl.interp_ssl"],
     "_hashlib"  : ["pypy.module._ssl.interp_ssl"],
+    "_minimal_curses": ["pypy.module._minimal_curses.fficurses"],
     "_continuation": ["rpython.rlib.rstacklet"],
     }
 
diff --git a/pypy/module/_minimal_curses/__init__.py b/pypy/module/_minimal_curses/__init__.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/__init__.py
@@ -0,0 +1,32 @@
+try:
+    import _curses
+except ImportError:
+    try:
+        import _minimal_curses as _curses   # when running on top of pypy-c
+    except ImportError:
+        import py
+        py.test.skip("no _curses or _minimal_curses module") #no _curses at all
+
+from pypy.interpreter.mixedmodule import MixedModule
+from pypy.module._minimal_curses import fficurses  # for side effects
+
+
+class Module(MixedModule):
+    """ Low-level interface for curses module,
+    not meant to be used directly
+    """
+
+    appleveldefs = {
+        'error'          : 'app_curses.error',
+    }
+
+    interpleveldefs = {
+        'setupterm'      : 'interp_curses.setupterm',
+        'tigetstr'       : 'interp_curses.tigetstr',
+        'tparm'          : 'interp_curses.tparm',
+    }
+
+for i in dir(_curses):
+    val = getattr(_curses, i)
+    if i.isupper() and type(val) is int:
+        Module.interpleveldefs[i] = "space.wrap(%s)" % val
diff --git a/pypy/module/_minimal_curses/app_curses.py b/pypy/module/_minimal_curses/app_curses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/app_curses.py
@@ -0,0 +1,3 @@
+
+class error(Exception):
+    pass
diff --git a/pypy/module/_minimal_curses/fficurses.py b/pypy/module/_minimal_curses/fficurses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/fficurses.py
@@ -0,0 +1,113 @@
+
+""" The ffi for rpython, need to be imported for side effects
+"""
+
+from rpython.rtyper.lltypesystem import rffi
+from rpython.rtyper.lltypesystem import lltype
+from rpython.rtyper.tool import rffi_platform
+from rpython.rtyper.extfunc import register_external
+from pypy.module._minimal_curses import interp_curses
+from rpython.translator.tool.cbuild import ExternalCompilationInfo
+from sys import platform
+import os.path
+
+_CYGWIN = platform == 'cygwin'
+_NCURSES_CURSES = os.path.isfile("/usr/include/ncurses/curses.h") 
+
+if _CYGWIN or _NCURSES_CURSES:
+    eci = ExternalCompilationInfo(
+        includes = ['ncurses/curses.h', 'ncurses/term.h'],
+        libraries = ['curses'],
+    )
+else:
+    eci = ExternalCompilationInfo(
+        includes = ['curses.h', 'term.h'],
+        libraries = ['curses'],
+    )
+
+rffi_platform.verify_eci(eci)
+
+
+INT = rffi.INT
+INTP = lltype.Ptr(lltype.Array(INT, hints={'nolength':True}))
+c_setupterm = rffi.llexternal('setupterm', [rffi.CCHARP, INT, INTP], INT,
+                              compilation_info=eci)
+c_tigetstr = rffi.llexternal('tigetstr', [rffi.CCHARP], rffi.CCHARP,
+                             compilation_info=eci)
+c_tparm = rffi.llexternal('tparm', [rffi.CCHARP, INT, INT, INT, INT, INT,
+                                    INT, INT, INT, INT], rffi.CCHARP,
+                          compilation_info=eci)
+
+ERR = rffi.CConstant('ERR', lltype.Signed)
+OK = rffi.CConstant('OK', lltype.Signed)
+
+def curses_setupterm(term, fd):
+    intp = lltype.malloc(INTP.TO, 1, flavor='raw')
+    err = rffi.cast(lltype.Signed, c_setupterm(term, fd, intp))
+    try:
+        if err == ERR:
+            errret = rffi.cast(lltype.Signed, intp[0])
+            if errret == 0:
+                msg = "setupterm: could not find terminal"
+            elif errret == -1:
+                msg = "setupterm: could not find terminfo database"
+            else:
+                msg = "setupterm: unknown error"
+            raise interp_curses.curses_error(msg)
+        interp_curses.module_info.setupterm_called = True
+    finally:
+        lltype.free(intp, flavor='raw')
+
+def curses_setupterm_null_llimpl(fd):
+    curses_setupterm(lltype.nullptr(rffi.CCHARP.TO), fd)
+
+def curses_setupterm_llimpl(term, fd):
+    ll_s = rffi.str2charp(term)
+    try:
+        curses_setupterm(ll_s, fd)
+    finally:
+        rffi.free_charp(ll_s)
+
+register_external(interp_curses._curses_setupterm_null,
+                  [int], llimpl=curses_setupterm_null_llimpl,
+                  export_name='_curses.setupterm_null')
+register_external(interp_curses._curses_setupterm,
+                  [str, int], llimpl=curses_setupterm_llimpl,
+                  export_name='_curses.setupterm')
+
+def check_setup_invoked():
+    if not interp_curses.module_info.setupterm_called:
+        raise interp_curses.curses_error("must call (at least) setupterm() first")
+
+def tigetstr_llimpl(cap):
+    check_setup_invoked()
+    ll_cap = rffi.str2charp(cap)
+    try:
+        ll_res = c_tigetstr(ll_cap)
+        num = lltype.cast_ptr_to_int(ll_res)
+        if num == 0 or num == -1:
+            raise interp_curses.TermError()
+        res = rffi.charp2str(ll_res)
+        return res
+    finally:
+        rffi.free_charp(ll_cap)
+
+register_external(interp_curses._curses_tigetstr, [str], str,
+                  export_name='_curses.tigetstr', llimpl=tigetstr_llimpl)
+
+def tparm_llimpl(s, args):
+    check_setup_invoked()
+    l = [0, 0, 0, 0, 0, 0, 0, 0, 0]
+    for i in range(min(len(args), 9)):
+        l[i] = args[i]
+    ll_s = rffi.str2charp(s)
+    # XXX nasty trick stolen from CPython
+    ll_res = c_tparm(ll_s, l[0], l[1], l[2], l[3], l[4], l[5], l[6],
+                     l[7], l[8])
+    rffi.free_charp(ll_s)
+    res = rffi.charp2str(ll_res)
+    return res
+
+register_external(interp_curses._curses_tparm, [str, [int]], str,
+                  export_name='_curses.tparm', llimpl=tparm_llimpl)
+
diff --git a/pypy/module/_minimal_curses/interp_curses.py b/pypy/module/_minimal_curses/interp_curses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/interp_curses.py
@@ -0,0 +1,94 @@
+
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.error import OperationError
+from pypy.module._minimal_curses import _curses
+
+class ModuleInfo:
+    def __init__(self):
+        self.setupterm_called = False
+
+module_info = ModuleInfo()
+
+class curses_error(Exception):
+    def __init__(self, msg):
+        self.msg = msg
+
+from rpython.annotator.classdef import FORCE_ATTRIBUTES_INTO_CLASSES
+from rpython.annotator.model import SomeString
+
+# this is necessary due to annmixlevel
+FORCE_ATTRIBUTES_INTO_CLASSES[curses_error] = {'msg': SomeString()}
+
+def convert_error(space, error):
+    msg = error.msg
+    w_module = space.getbuiltinmodule('_minimal_curses')
+    w_exception_class = space.getattr(w_module, space.wrap('error'))
+    w_exception = space.call_function(w_exception_class, space.wrap(msg))
+    return OperationError(w_exception_class, w_exception)
+
+def _curses_setupterm_null(fd):
+    # NOT_RPYTHON
+    try:
+        _curses.setupterm(None, fd)
+    except _curses.error, e:
+        raise curses_error(e.args[0])
+
+def _curses_setupterm(termname, fd):
+    # NOT_RPYTHON
+    try:
+        _curses.setupterm(termname, fd)
+    except _curses.error, e:
+        raise curses_error(e.args[0])
+
+ at unwrap_spec(fd=int)
+def setupterm(space, w_termname=None, fd=-1):
+    if fd == -1:
+        w_stdout = space.getattr(space.getbuiltinmodule('sys'),
+                                 space.wrap('stdout'))
+        fd = space.int_w(space.call_function(space.getattr(w_stdout,
+                                             space.wrap('fileno'))))
+    try:
+        if space.is_none(w_termname):
+            _curses_setupterm_null(fd)
+        else:
+            _curses_setupterm(space.str_w(w_termname), fd)
+    except curses_error, e:
+        raise convert_error(space, e)
+
+class TermError(Exception):
+    pass
+
+def _curses_tigetstr(capname):
+    # NOT_RPYTHON
+    try:
+        res = _curses.tigetstr(capname)
+    except _curses.error, e:
+        raise curses_error(e.args[0])
+    if res is None:
+        raise TermError
+    return res
+
+def _curses_tparm(s, args):
+    # NOT_RPYTHON
+    try:
+        return _curses.tparm(s, *args)
+    except _curses.error, e:
+        raise curses_error(e.args[0])
+
+ at unwrap_spec(capname=str)
+def tigetstr(space, capname):
+    try:
+        result = _curses_tigetstr(capname)
+    except TermError:
+        return space.w_None
+    except curses_error, e:
+        raise convert_error(space, e)
+    return space.wrap(result)
+
+ at unwrap_spec(s=str)
+def tparm(space, s, args_w):
+    args = [space.int_w(a) for a in args_w]
+    try:
+        return space.wrap(_curses_tparm(s, args))
+    except curses_error, e:
+        raise convert_error(space, e)
diff --git a/pypy/module/_minimal_curses/test/__init__.py b/pypy/module/_minimal_curses/test/__init__.py
new file mode 100644
diff --git a/pypy/module/_minimal_curses/test/test_curses.py b/pypy/module/_minimal_curses/test/test_curses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/test/test_curses.py
@@ -0,0 +1,108 @@
+from pypy.conftest import pypydir
+from rpython.tool.udir import udir
+import py
+import sys
+# tests here are run as snippets through a pexpected python subprocess
+
+
+def setup_module(mod):
+    try:
+        import curses
+        curses.setupterm()
+    except:
+        py.test.skip("Cannot test this here")
+
+class TestCurses(object):
+    """ We need to fork here, to prevent
+    the setup to be done
+    """
+    def _spawn(self, *args, **kwds):
+        import pexpect
+        kwds.setdefault('timeout', 600)
+        print 'SPAWN:', args, kwds
+        child = pexpect.spawn(*args, **kwds)
+        child.logfile = sys.stdout
+        return child
+
+    def spawn(self, argv):
+        py_py = py.path.local(pypydir).join('bin', 'pyinteractive.py')
+        return self._spawn(sys.executable, [str(py_py)] + argv)
+
+    def setup_class(self):
+        try:
+            import pexpect
+        except ImportError:
+            py.test.skip('pexpect not found')
+
+    def test_setupterm(self):
+        source = py.code.Source("""
+        import _minimal_curses
+        try:
+            _minimal_curses.tigetstr('cup')
+        except _minimal_curses.error:
+            print 'ok!'
+        """)
+        f = udir.join("test_setupterm.py")
+        f.write(source)
+        child = self.spawn(['--withmod-_minimal_curses', str(f)])
+        child.expect('ok!')
+
+    def test_tigetstr(self):
+        source = py.code.Source("""
+        import _minimal_curses
+        _minimal_curses.setupterm()
+        assert _minimal_curses.tigetstr('cup') == '\x1b[%i%p1%d;%p2%dH'
+        print 'ok!'
+        """)
+        f = udir.join("test_tigetstr.py")
+        f.write(source)
+        child = self.spawn(['--withmod-_minimal_curses', str(f)])
+        child.expect('ok!')
+
+    def test_tparm(self):
+        source = py.code.Source("""
+        import _minimal_curses
+        _minimal_curses.setupterm()
+        assert _minimal_curses.tparm(_minimal_curses.tigetstr('cup'), 5, 3) == '\033[6;4H'
+        print 'ok!'
+        """)
+        f = udir.join("test_tparm.py")
+        f.write(source)
+        child = self.spawn(['--withmod-_minimal_curses', str(f)])
+        child.expect('ok!')
+        
+class TestCCurses(object):
+    """ Test compiled version
+    """
+    def test_csetupterm(self):
+        from rpython.translator.c.test.test_genc import compile
+        from pypy.module._minimal_curses import interp_curses
+        def runs_setupterm():
+            interp_curses._curses_setupterm_null(1)
+
+        fn = compile(runs_setupterm, [])
+        fn()
+
+    def test_ctgetstr(self):
+        from rpython.translator.c.test.test_genc import compile
+        from pypy.module._minimal_curses import interp_curses
+        def runs_ctgetstr():
+            interp_curses._curses_setupterm("xterm", 1)
+            return interp_curses._curses_tigetstr('cup')
+
+        fn = compile(runs_ctgetstr, [])
+        res = fn()
+        assert res == '\x1b[%i%p1%d;%p2%dH'
+
+    def test_ctparm(self):
+        from rpython.translator.c.test.test_genc import compile
+        from pypy.module._minimal_curses import interp_curses
+        def runs_tparm():
+            interp_curses._curses_setupterm("xterm", 1)
+            cup = interp_curses._curses_tigetstr('cup')
+            return interp_curses._curses_tparm(cup, [5, 3])
+
+        fn = compile(runs_tparm, [])
+        res = fn()
+        assert res == '\033[6;4H'
+


More information about the pypy-commit mailing list