[pypy-commit] pypy default: readd _minimal_curses
fijal
noreply at buildbot.pypy.org
Wed Apr 3 11:10:02 CEST 2013
Author: Maciej Fijalkowski <fijall at gmail.com>
Branch:
Changeset: r62959:3dcd89310152
Date: 2013-04-03 11:08 +0200
http://bitbucket.org/pypy/pypy/changeset/3dcd89310152/
Log: readd _minimal_curses
diff --git a/pypy/config/pypyoption.py b/pypy/config/pypyoption.py
--- a/pypy/config/pypyoption.py
+++ b/pypy/config/pypyoption.py
@@ -31,7 +31,7 @@
["_socket", "unicodedata", "mmap", "fcntl", "_locale", "pwd",
"rctime" , "select", "zipimport", "_lsprof",
"crypt", "signal", "_rawffi", "termios", "zlib", "bz2",
- "struct", "_hashlib", "_md5", "_sha", "cStringIO",
+ "struct", "_hashlib", "_md5", "_sha", "_minimal_curses", "cStringIO",
"thread", "itertools", "pyexpat", "_ssl", "cpyext", "array",
"binascii", "_multiprocessing", '_warnings',
"_collections", "_multibytecodec", "micronumpy", "_ffi",
@@ -45,7 +45,7 @@
"binascii",
# the following are needed for pyrepl (and hence for the
# interactive prompt/pdb)
- "termios",
+ "termios", "_minimal_curses",
]))
working_oo_modules = default_modules.copy()
@@ -62,6 +62,7 @@
del working_modules["fcntl"]
del working_modules["pwd"]
del working_modules["termios"]
+ del working_modules["_minimal_curses"]
# The _locale module is needed by site.py on Windows
default_modules["_locale"] = None
@@ -71,6 +72,7 @@
del working_modules['rctime'] # depend on ctypes, missing tm_zone/tm_gmtoff
del working_modules['signal'] # depend on ctypes, can't get at c-level 'errono'
del working_modules['fcntl'] # LOCK_NB not defined
+ del working_modules["_minimal_curses"]
del working_modules["termios"]
del working_modules["_multiprocessing"] # depends on rctime
@@ -101,6 +103,7 @@
"pyexpat" : ["pypy.module.pyexpat.interp_pyexpat"],
"_ssl" : ["pypy.module._ssl.interp_ssl"],
"_hashlib" : ["pypy.module._ssl.interp_ssl"],
+ "_minimal_curses": ["pypy.module._minimal_curses.fficurses"],
"_continuation": ["rpython.rlib.rstacklet"],
}
diff --git a/pypy/module/_minimal_curses/__init__.py b/pypy/module/_minimal_curses/__init__.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/__init__.py
@@ -0,0 +1,32 @@
+try:
+ import _curses
+except ImportError:
+ try:
+ import _minimal_curses as _curses # when running on top of pypy-c
+ except ImportError:
+ import py
+ py.test.skip("no _curses or _minimal_curses module") #no _curses at all
+
+from pypy.interpreter.mixedmodule import MixedModule
+from pypy.module._minimal_curses import fficurses # for side effects
+
+
+class Module(MixedModule):
+ """ Low-level interface for curses module,
+ not meant to be used directly
+ """
+
+ appleveldefs = {
+ 'error' : 'app_curses.error',
+ }
+
+ interpleveldefs = {
+ 'setupterm' : 'interp_curses.setupterm',
+ 'tigetstr' : 'interp_curses.tigetstr',
+ 'tparm' : 'interp_curses.tparm',
+ }
+
+for i in dir(_curses):
+ val = getattr(_curses, i)
+ if i.isupper() and type(val) is int:
+ Module.interpleveldefs[i] = "space.wrap(%s)" % val
diff --git a/pypy/module/_minimal_curses/app_curses.py b/pypy/module/_minimal_curses/app_curses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/app_curses.py
@@ -0,0 +1,3 @@
+
+class error(Exception):
+ pass
diff --git a/pypy/module/_minimal_curses/fficurses.py b/pypy/module/_minimal_curses/fficurses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/fficurses.py
@@ -0,0 +1,113 @@
+
+""" The ffi for rpython, need to be imported for side effects
+"""
+
+from rpython.rtyper.lltypesystem import rffi
+from rpython.rtyper.lltypesystem import lltype
+from rpython.rtyper.tool import rffi_platform
+from rpython.rtyper.extfunc import register_external
+from pypy.module._minimal_curses import interp_curses
+from rpython.translator.tool.cbuild import ExternalCompilationInfo
+from sys import platform
+import os.path
+
+_CYGWIN = platform == 'cygwin'
+_NCURSES_CURSES = os.path.isfile("/usr/include/ncurses/curses.h")
+
+if _CYGWIN or _NCURSES_CURSES:
+ eci = ExternalCompilationInfo(
+ includes = ['ncurses/curses.h', 'ncurses/term.h'],
+ libraries = ['curses'],
+ )
+else:
+ eci = ExternalCompilationInfo(
+ includes = ['curses.h', 'term.h'],
+ libraries = ['curses'],
+ )
+
+rffi_platform.verify_eci(eci)
+
+
+INT = rffi.INT
+INTP = lltype.Ptr(lltype.Array(INT, hints={'nolength':True}))
+c_setupterm = rffi.llexternal('setupterm', [rffi.CCHARP, INT, INTP], INT,
+ compilation_info=eci)
+c_tigetstr = rffi.llexternal('tigetstr', [rffi.CCHARP], rffi.CCHARP,
+ compilation_info=eci)
+c_tparm = rffi.llexternal('tparm', [rffi.CCHARP, INT, INT, INT, INT, INT,
+ INT, INT, INT, INT], rffi.CCHARP,
+ compilation_info=eci)
+
+ERR = rffi.CConstant('ERR', lltype.Signed)
+OK = rffi.CConstant('OK', lltype.Signed)
+
+def curses_setupterm(term, fd):
+ intp = lltype.malloc(INTP.TO, 1, flavor='raw')
+ err = rffi.cast(lltype.Signed, c_setupterm(term, fd, intp))
+ try:
+ if err == ERR:
+ errret = rffi.cast(lltype.Signed, intp[0])
+ if errret == 0:
+ msg = "setupterm: could not find terminal"
+ elif errret == -1:
+ msg = "setupterm: could not find terminfo database"
+ else:
+ msg = "setupterm: unknown error"
+ raise interp_curses.curses_error(msg)
+ interp_curses.module_info.setupterm_called = True
+ finally:
+ lltype.free(intp, flavor='raw')
+
+def curses_setupterm_null_llimpl(fd):
+ curses_setupterm(lltype.nullptr(rffi.CCHARP.TO), fd)
+
+def curses_setupterm_llimpl(term, fd):
+ ll_s = rffi.str2charp(term)
+ try:
+ curses_setupterm(ll_s, fd)
+ finally:
+ rffi.free_charp(ll_s)
+
+register_external(interp_curses._curses_setupterm_null,
+ [int], llimpl=curses_setupterm_null_llimpl,
+ export_name='_curses.setupterm_null')
+register_external(interp_curses._curses_setupterm,
+ [str, int], llimpl=curses_setupterm_llimpl,
+ export_name='_curses.setupterm')
+
+def check_setup_invoked():
+ if not interp_curses.module_info.setupterm_called:
+ raise interp_curses.curses_error("must call (at least) setupterm() first")
+
+def tigetstr_llimpl(cap):
+ check_setup_invoked()
+ ll_cap = rffi.str2charp(cap)
+ try:
+ ll_res = c_tigetstr(ll_cap)
+ num = lltype.cast_ptr_to_int(ll_res)
+ if num == 0 or num == -1:
+ raise interp_curses.TermError()
+ res = rffi.charp2str(ll_res)
+ return res
+ finally:
+ rffi.free_charp(ll_cap)
+
+register_external(interp_curses._curses_tigetstr, [str], str,
+ export_name='_curses.tigetstr', llimpl=tigetstr_llimpl)
+
+def tparm_llimpl(s, args):
+ check_setup_invoked()
+ l = [0, 0, 0, 0, 0, 0, 0, 0, 0]
+ for i in range(min(len(args), 9)):
+ l[i] = args[i]
+ ll_s = rffi.str2charp(s)
+ # XXX nasty trick stolen from CPython
+ ll_res = c_tparm(ll_s, l[0], l[1], l[2], l[3], l[4], l[5], l[6],
+ l[7], l[8])
+ rffi.free_charp(ll_s)
+ res = rffi.charp2str(ll_res)
+ return res
+
+register_external(interp_curses._curses_tparm, [str, [int]], str,
+ export_name='_curses.tparm', llimpl=tparm_llimpl)
+
diff --git a/pypy/module/_minimal_curses/interp_curses.py b/pypy/module/_minimal_curses/interp_curses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/interp_curses.py
@@ -0,0 +1,94 @@
+
+from pypy.interpreter.gateway import unwrap_spec
+from pypy.interpreter.error import OperationError
+from pypy.module._minimal_curses import _curses
+
+class ModuleInfo:
+ def __init__(self):
+ self.setupterm_called = False
+
+module_info = ModuleInfo()
+
+class curses_error(Exception):
+ def __init__(self, msg):
+ self.msg = msg
+
+from rpython.annotator.classdef import FORCE_ATTRIBUTES_INTO_CLASSES
+from rpython.annotator.model import SomeString
+
+# this is necessary due to annmixlevel
+FORCE_ATTRIBUTES_INTO_CLASSES[curses_error] = {'msg': SomeString()}
+
+def convert_error(space, error):
+ msg = error.msg
+ w_module = space.getbuiltinmodule('_minimal_curses')
+ w_exception_class = space.getattr(w_module, space.wrap('error'))
+ w_exception = space.call_function(w_exception_class, space.wrap(msg))
+ return OperationError(w_exception_class, w_exception)
+
+def _curses_setupterm_null(fd):
+ # NOT_RPYTHON
+ try:
+ _curses.setupterm(None, fd)
+ except _curses.error, e:
+ raise curses_error(e.args[0])
+
+def _curses_setupterm(termname, fd):
+ # NOT_RPYTHON
+ try:
+ _curses.setupterm(termname, fd)
+ except _curses.error, e:
+ raise curses_error(e.args[0])
+
+ at unwrap_spec(fd=int)
+def setupterm(space, w_termname=None, fd=-1):
+ if fd == -1:
+ w_stdout = space.getattr(space.getbuiltinmodule('sys'),
+ space.wrap('stdout'))
+ fd = space.int_w(space.call_function(space.getattr(w_stdout,
+ space.wrap('fileno'))))
+ try:
+ if space.is_none(w_termname):
+ _curses_setupterm_null(fd)
+ else:
+ _curses_setupterm(space.str_w(w_termname), fd)
+ except curses_error, e:
+ raise convert_error(space, e)
+
+class TermError(Exception):
+ pass
+
+def _curses_tigetstr(capname):
+ # NOT_RPYTHON
+ try:
+ res = _curses.tigetstr(capname)
+ except _curses.error, e:
+ raise curses_error(e.args[0])
+ if res is None:
+ raise TermError
+ return res
+
+def _curses_tparm(s, args):
+ # NOT_RPYTHON
+ try:
+ return _curses.tparm(s, *args)
+ except _curses.error, e:
+ raise curses_error(e.args[0])
+
+ at unwrap_spec(capname=str)
+def tigetstr(space, capname):
+ try:
+ result = _curses_tigetstr(capname)
+ except TermError:
+ return space.w_None
+ except curses_error, e:
+ raise convert_error(space, e)
+ return space.wrap(result)
+
+ at unwrap_spec(s=str)
+def tparm(space, s, args_w):
+ args = [space.int_w(a) for a in args_w]
+ try:
+ return space.wrap(_curses_tparm(s, args))
+ except curses_error, e:
+ raise convert_error(space, e)
diff --git a/pypy/module/_minimal_curses/test/__init__.py b/pypy/module/_minimal_curses/test/__init__.py
new file mode 100644
diff --git a/pypy/module/_minimal_curses/test/test_curses.py b/pypy/module/_minimal_curses/test/test_curses.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_minimal_curses/test/test_curses.py
@@ -0,0 +1,108 @@
+from pypy.conftest import pypydir
+from rpython.tool.udir import udir
+import py
+import sys
+# tests here are run as snippets through a pexpected python subprocess
+
+
+def setup_module(mod):
+ try:
+ import curses
+ curses.setupterm()
+ except:
+ py.test.skip("Cannot test this here")
+
+class TestCurses(object):
+ """ We need to fork here, to prevent
+ the setup to be done
+ """
+ def _spawn(self, *args, **kwds):
+ import pexpect
+ kwds.setdefault('timeout', 600)
+ print 'SPAWN:', args, kwds
+ child = pexpect.spawn(*args, **kwds)
+ child.logfile = sys.stdout
+ return child
+
+ def spawn(self, argv):
+ py_py = py.path.local(pypydir).join('bin', 'pyinteractive.py')
+ return self._spawn(sys.executable, [str(py_py)] + argv)
+
+ def setup_class(self):
+ try:
+ import pexpect
+ except ImportError:
+ py.test.skip('pexpect not found')
+
+ def test_setupterm(self):
+ source = py.code.Source("""
+ import _minimal_curses
+ try:
+ _minimal_curses.tigetstr('cup')
+ except _minimal_curses.error:
+ print 'ok!'
+ """)
+ f = udir.join("test_setupterm.py")
+ f.write(source)
+ child = self.spawn(['--withmod-_minimal_curses', str(f)])
+ child.expect('ok!')
+
+ def test_tigetstr(self):
+ source = py.code.Source("""
+ import _minimal_curses
+ _minimal_curses.setupterm()
+ assert _minimal_curses.tigetstr('cup') == '\x1b[%i%p1%d;%p2%dH'
+ print 'ok!'
+ """)
+ f = udir.join("test_tigetstr.py")
+ f.write(source)
+ child = self.spawn(['--withmod-_minimal_curses', str(f)])
+ child.expect('ok!')
+
+ def test_tparm(self):
+ source = py.code.Source("""
+ import _minimal_curses
+ _minimal_curses.setupterm()
+ assert _minimal_curses.tparm(_minimal_curses.tigetstr('cup'), 5, 3) == '\033[6;4H'
+ print 'ok!'
+ """)
+ f = udir.join("test_tparm.py")
+ f.write(source)
+ child = self.spawn(['--withmod-_minimal_curses', str(f)])
+ child.expect('ok!')
+
+class TestCCurses(object):
+ """ Test compiled version
+ """
+ def test_csetupterm(self):
+ from rpython.translator.c.test.test_genc import compile
+ from pypy.module._minimal_curses import interp_curses
+ def runs_setupterm():
+ interp_curses._curses_setupterm_null(1)
+
+ fn = compile(runs_setupterm, [])
+ fn()
+
+ def test_ctgetstr(self):
+ from rpython.translator.c.test.test_genc import compile
+ from pypy.module._minimal_curses import interp_curses
+ def runs_ctgetstr():
+ interp_curses._curses_setupterm("xterm", 1)
+ return interp_curses._curses_tigetstr('cup')
+
+ fn = compile(runs_ctgetstr, [])
+ res = fn()
+ assert res == '\x1b[%i%p1%d;%p2%dH'
+
+ def test_ctparm(self):
+ from rpython.translator.c.test.test_genc import compile
+ from pypy.module._minimal_curses import interp_curses
+ def runs_tparm():
+ interp_curses._curses_setupterm("xterm", 1)
+ cup = interp_curses._curses_tigetstr('cup')
+ return interp_curses._curses_tparm(cup, [5, 3])
+
+ fn = compile(runs_tparm, [])
+ res = fn()
+ assert res == '\033[6;4H'
+
More information about the pypy-commit
mailing list