[pypy-commit] pypy sandbox-2: Fix some of the tests
arigo
pypy.commits at gmail.com
Tue Aug 27 02:43:07 EDT 2019
Author: Armin Rigo <arigo at tunes.org>
Branch: sandbox-2
Changeset: r97288:7177f77afa08
Date: 2019-08-26 23:04 +0200
http://bitbucket.org/pypy/pypy/changeset/7177f77afa08/
Log: Fix some of the tests
diff --git a/rpython/translator/sandbox/graphchecker.py b/rpython/translator/sandbox/graphchecker.py
--- a/rpython/translator/sandbox/graphchecker.py
+++ b/rpython/translator/sandbox/graphchecker.py
@@ -105,7 +105,7 @@
elif opname in ('cast_ptr_to_adr', 'force_cast',
'cast_int_to_ptr'):
if is_gc_ptr(op.result.concretetype):
- return "result is a GC ptr: %r" % (opname,)
+ return "result is a GC ptr: %r" % (op,)
else:
return "unsupported llop: %r" % (opname,)
diff --git a/rpython/translator/sandbox/sandlib.py b/rpython/translator/sandbox/sandlib.py
deleted file mode 100644
--- a/rpython/translator/sandbox/sandlib.py
+++ /dev/null
@@ -1,517 +0,0 @@
-"""
-A Python library to execute and communicate with a subprocess that
-was translated from RPython code with --sandbox. This library is
-for the outer process, which can run CPython or PyPy.
-"""
-
-import sys, os, posixpath, errno, stat, time
-import subprocess
-from rpython.tool.killsubprocess import killsubprocess
-from rpython.translator.sandbox.vfs import UID, GID
-import py
-
-WIN32 = os.name == "nt"
-
-
-def create_log():
- """Make and return a log for the sandbox to use, if needed."""
- from rpython.tool.ansi_print import AnsiLogger
- return AnsiLogger("sandlib")
-
-def write_exception(g, exception, tb=None):
- for i, excclass in EXCEPTION_TABLE:
- if isinstance(exception, excclass):
- write_message(g, i)
- if excclass is OSError:
- error = exception.errno
- if error is None:
- error = errno.EPERM
- write_message(g, error)
- g.flush()
- break
- else:
- # just re-raise the exception
- raise exception.__class__, exception, tb
-
-def shortrepr(x):
- r = repr(x)
- if len(r) >= 80:
- r = r[:20] + '...' + r[-8:]
- return r
-
-def signal_name(n):
- import signal
- for key, value in signal.__dict__.items():
- if key.startswith('SIG') and not key.startswith('SIG_') and value == n:
- return key
- return 'signal %d' % (n,)
-
-
-class SandboxedProc(object):
- """Base class to control a sandboxed subprocess.
- Inherit from this class and implement all the do_xxx() methods
- for the external functions xxx that you want to support.
- """
- debug = False
- log = None
- os_level_sandboxing = False # Linux only: /proc/PID/seccomp
-
- def __init__(self, args, executable=None):
- """'args' should a sequence of argument for the subprocess,
- starting with the full path of the executable.
- """
- self.popen = subprocess.Popen(args, executable=executable,
- bufsize=-1,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- close_fds=False if WIN32 else True,
- env={})
- self.popenlock = None
- self.currenttimeout = None
- self.currentlyidlefrom = None
-
- if self.debug:
- self.log = create_log()
-
- def withlock(self, function, *args, **kwds):
- lock = self.popenlock
- if lock is not None:
- lock.acquire()
- try:
- return function(*args, **kwds)
- finally:
- if lock is not None:
- lock.release()
-
- def settimeout(self, timeout, interrupt_main=False):
- """Start a timeout that will kill the subprocess after the given
- amount of time. Only one timeout can be active at a time.
- """
- import thread
-
- def _waiting_thread():
- while True:
- while self.currentlyidlefrom is not None:
- time.sleep(1) # can't timeout while idle
- t = self.currenttimeout
- if t is None:
- return # cancelled
- delay = t - time.time()
- if delay <= 0.0:
- break # expired!
- time.sleep(min(delay*1.001, 1))
- if self.log:
- self.log.timeout("timeout!")
- self.kill()
- #if interrupt_main:
- # if hasattr(os, 'kill'):
- # import signal
- # os.kill(os.getpid(), signal.SIGINT)
- # else:
- # thread.interrupt_main()
-
- def _settimeout():
- need_new_thread = self.currenttimeout is None
- self.currenttimeout = time.time() + timeout
- if need_new_thread:
- thread.start_new_thread(_waiting_thread, ())
-
- if self.popenlock is None:
- self.popenlock = thread.allocate_lock()
- self.withlock(_settimeout)
-
- def canceltimeout(self):
- """Cancel the current timeout."""
- self.currenttimeout = None
- self.currentlyidlefrom = None
-
- def enter_idle(self):
- self.currentlyidlefrom = time.time()
-
- def leave_idle(self):
- def _postpone_timeout():
- t = self.currentlyidlefrom
- if t is not None and self.currenttimeout is not None:
- self.currenttimeout += time.time() - t
- try:
- self.withlock(_postpone_timeout)
- finally:
- self.currentlyidlefrom = None
-
- def poll(self):
- returncode = self.withlock(self.popen.poll)
- if returncode is not None:
- self.canceltimeout()
- return returncode
-
- def wait(self):
- returncode = self.withlock(self.popen.wait)
- if returncode is not None:
- self.canceltimeout()
- return returncode
-
- def kill(self):
- self.withlock(killsubprocess, self.popen)
-
- def handle_forever(self):
- returncode = self.handle_until_return()
- if returncode != 0:
- raise OSError("the sandboxed subprocess exited with code %d" % (
- returncode,))
-
- def handle_until_return(self):
- child_stdin = self.popen.stdin
- child_stdout = self.popen.stdout
- if self.os_level_sandboxing and sys.platform.startswith('linux'):
- # rationale: we wait until the child process started completely,
- # letting the C library do any system calls it wants for
- # initialization. When the RPython code starts up, it quickly
- # does its first system call. At this point we turn seccomp on.
- import select
- select.select([child_stdout], [], [])
- f = open('/proc/%d/seccomp' % self.popen.pid, 'w')
- print >> f, 1
- f.close()
- while True:
- try:
- fnname = read_message(child_stdout)
- args = read_message(child_stdout)
- except EOFError as e:
- break
- if self.log and not self.is_spam(fnname, *args):
- self.log.call('%s(%s)' % (fnname,
- ', '.join([shortrepr(x) for x in args])))
- try:
- answer, resulttype = self.handle_message(fnname, *args)
- except Exception as e:
- tb = sys.exc_info()[2]
- write_exception(child_stdin, e, tb)
- if self.log:
- if str(e):
- self.log.exception('%s: %s' % (e.__class__.__name__, e))
- else:
- self.log.exception('%s' % (e.__class__.__name__,))
- else:
- if self.log and not self.is_spam(fnname, *args):
- self.log.result(shortrepr(answer))
- try:
- write_message(child_stdin, 0) # error code - 0 for ok
- write_message(child_stdin, answer, resulttype)
- child_stdin.flush()
- except (IOError, OSError):
- # likely cause: subprocess is dead, child_stdin closed
- if self.poll() is not None:
- break
- else:
- raise
- returncode = self.wait()
- return returncode
-
- def is_spam(self, fnname, *args):
- # To hide the spamming amounts of reads and writes to stdin and stdout
- # in interactive sessions
- return (fnname in ('ll_os.ll_os_read', 'll_os.ll_os_write') and
- args[0] in (0, 1, 2))
-
- def handle_message(self, fnname, *args):
- if '__' in fnname:
- raise ValueError("unsafe fnname")
- try:
- handler = getattr(self, 'do_' + fnname.replace('.', '__'))
- except AttributeError:
- raise RuntimeError("no handler for this function")
- resulttype = getattr(handler, 'resulttype', None)
- return handler(*args), resulttype
-
-
-class SimpleIOSandboxedProc(SandboxedProc):
- """Control a sandboxed subprocess which is only allowed to read from
- its stdin and write to its stdout and stderr.
- """
- _input = None
- _output = None
- _error = None
- inputlogfile = None
-
- def communicate(self, input=None):
- """Send data to stdin. Read data from stdout and stderr,
- until end-of-file is reached. Wait for process to terminate.
- """
- import cStringIO
- if input:
- if isinstance(input, str):
- input = cStringIO.StringIO(input)
- self._input = input
- self._output = cStringIO.StringIO()
- self._error = cStringIO.StringIO()
- self.handle_forever()
- output = self._output.getvalue()
- self._output = None
- error = self._error.getvalue()
- self._error = None
- return (output, error)
-
- def interact(self, stdin=None, stdout=None, stderr=None):
- """Interact with the subprocess. By default, stdin, stdout and
- stderr are set to the ones from 'sys'."""
- import sys
- self._input = stdin or sys.stdin
- self._output = stdout or sys.stdout
- self._error = stderr or sys.stderr
- returncode = self.handle_until_return()
- if returncode != 0:
- if os.name == 'posix' and returncode < 0:
- print >> self._error, "[Subprocess killed by %s]" % (
- signal_name(-returncode),)
- else:
- print >> self._error, "[Subprocess exit code: %d]" % (
- returncode,)
- self._input = None
- self._output = None
- self._error = None
- return returncode
-
- def setlogfile(self, filename):
- self.inputlogfile = open(filename, 'a')
-
- def do_ll_os__ll_os_read(self, fd, size):
- if fd == 0:
- if self._input is None:
- return ""
- elif (getattr(self, 'virtual_console_isatty', False) or
- self._input.isatty()):
- # don't wait for all 'size' chars if reading from a tty,
- # to avoid blocking. Instead, stop after reading a line.
-
- # For now, waiting at the interactive console is the
- # only time that counts as idle.
- self.enter_idle()
- try:
- inputdata = self._input.readline(size)
- finally:
- self.leave_idle()
- else:
- inputdata = self._input.read(size)
- if self.inputlogfile is not None:
- self.inputlogfile.write(inputdata)
- return inputdata
- raise OSError("trying to read from fd %d" % (fd,))
-
- def do_ll_os__ll_os_write(self, fd, data):
- if fd == 1:
- self._output.write(data)
- return len(data)
- if fd == 2:
- self._error.write(data)
- return len(data)
- raise OSError("trying to write to fd %d" % (fd,))
-
- # let's allow access to the real time
- def do_ll_time__ll_time_sleep(self, seconds):
- # regularly check for timeouts that could have killed the
- # subprocess
- while seconds > 5.0:
- time.sleep(5.0)
- seconds -= 5.0
- if self.poll() is not None: # subprocess finished?
- return
- time.sleep(seconds)
-
- def do_ll_time__ll_time_time(self):
- return time.time()
-
- def do_ll_time__ll_time_clock(self):
- # measuring the CPU time of the controller process has
- # not much meaning, so let's emulate this and return
- # the real time elapsed since the first call to clock()
- # (this is one of the behaviors allowed by the docs)
- try:
- starttime = self.starttime
- except AttributeError:
- starttime = self.starttime = time.time()
- return time.time() - starttime
-
-class VirtualizedSandboxedProc(SandboxedProc):
- """Control a virtualized sandboxed process, which is given a custom
- view on the filesystem and a custom environment.
- """
- virtual_env = {}
- virtual_cwd = '/tmp'
- virtual_console_isatty = False
- virtual_fd_range = range(3, 50)
-
- def __init__(self, *args, **kwds):
- super(VirtualizedSandboxedProc, self).__init__(*args, **kwds)
- self.virtual_root = self.build_virtual_root()
- self.open_fds = {} # {virtual_fd: (real_file_object, node)}
-
- def build_virtual_root(self):
- raise NotImplementedError("must be overridden")
-
- def do_ll_os__ll_os_envitems(self):
- return self.virtual_env.items()
-
- def do_ll_os__ll_os_getenv(self, name):
- return self.virtual_env.get(name)
-
- def translate_path(self, vpath):
- # XXX this assumes posix vpaths for now, but os-specific real paths
- vpath = posixpath.normpath(posixpath.join(self.virtual_cwd, vpath))
- dirnode = self.virtual_root
- components = [component for component in vpath.split('/')]
- for component in components[:-1]:
- if component:
- dirnode = dirnode.join(component)
- if dirnode.kind != stat.S_IFDIR:
- raise OSError(errno.ENOTDIR, component)
- return dirnode, components[-1]
-
- def get_node(self, vpath):
- dirnode, name = self.translate_path(vpath)
- if name:
- node = dirnode.join(name)
- else:
- node = dirnode
- if self.log:
- self.log.vpath('%r => %r' % (vpath, node))
- return node
-
- def do_ll_os__ll_os_stat(self, vpathname):
- node = self.get_node(vpathname)
- return node.stat()
- do_ll_os__ll_os_stat.resulttype = RESULTTYPE_STATRESULT
-
- do_ll_os__ll_os_lstat = do_ll_os__ll_os_stat
-
- def do_ll_os__ll_os_access(self, vpathname, mode):
- try:
- node = self.get_node(vpathname)
- except OSError as e:
- if e.errno == errno.ENOENT:
- return False
- raise
- return node.access(mode)
-
- def do_ll_os__ll_os_isatty(self, fd):
- return self.virtual_console_isatty and fd in (0, 1, 2)
-
- def allocate_fd(self, f, node=None):
- for fd in self.virtual_fd_range:
- if fd not in self.open_fds:
- self.open_fds[fd] = (f, node)
- return fd
- else:
- raise OSError(errno.EMFILE, "trying to open too many files")
-
- def get_fd(self, fd, throw=True):
- """Get the objects implementing file descriptor `fd`.
-
- Returns a pair, (open file, vfs node)
-
- `throw`: if true, raise OSError for bad fd, else return (None, None).
- """
- try:
- f, node = self.open_fds[fd]
- except KeyError:
- if throw:
- raise OSError(errno.EBADF, "bad file descriptor")
- return None, None
- return f, node
-
- def get_file(self, fd, throw=True):
- """Return the open file for file descriptor `fd`."""
- return self.get_fd(fd, throw)[0]
-
- def do_ll_os__ll_os_open(self, vpathname, flags, mode):
- node = self.get_node(vpathname)
- if flags & (os.O_RDONLY|os.O_WRONLY|os.O_RDWR) != os.O_RDONLY:
- raise OSError(errno.EPERM, "write access denied")
- # all other flags are ignored
- f = node.open()
- return self.allocate_fd(f, node)
-
- def do_ll_os__ll_os_close(self, fd):
- f = self.get_file(fd)
- del self.open_fds[fd]
- f.close()
-
- def do_ll_os__ll_os_read(self, fd, size):
- f = self.get_file(fd, throw=False)
- if f is None:
- return super(VirtualizedSandboxedProc, self).do_ll_os__ll_os_read(
- fd, size)
- else:
- if not (0 <= size <= sys.maxint):
- raise OSError(errno.EINVAL, "invalid read size")
- # don't try to read more than 256KB at once here
- return f.read(min(size, 256*1024))
-
- def do_ll_os__ll_os_fstat(self, fd):
- f, node = self.get_fd(fd)
- return node.stat()
- do_ll_os__ll_os_fstat.resulttype = RESULTTYPE_STATRESULT
-
- def do_ll_os__ll_os_lseek(self, fd, pos, how):
- f = self.get_file(fd)
- f.seek(pos, how)
- return f.tell()
- do_ll_os__ll_os_lseek.resulttype = RESULTTYPE_LONGLONG
-
- def do_ll_os__ll_os_getcwd(self):
- return self.virtual_cwd
-
- def do_ll_os__ll_os_strerror(self, errnum):
- # unsure if this shouldn't be considered safeboxsafe
- return os.strerror(errnum) or ('Unknown error %d' % (errnum,))
-
- def do_ll_os__ll_os_listdir(self, vpathname):
- node = self.get_node(vpathname)
- return node.keys()
-
- def do_ll_os__ll_os_unlink(self, vpathname):
- raise OSError(errno.EPERM, "write access denied")
-
- def do_ll_os__ll_os_mkdir(self, vpathname, mode=None):
- raise OSError(errno.EPERM, "write access denied")
-
- def do_ll_os__ll_os_getuid(self):
- return UID
- do_ll_os__ll_os_geteuid = do_ll_os__ll_os_getuid
-
- def do_ll_os__ll_os_getgid(self):
- return GID
- do_ll_os__ll_os_getegid = do_ll_os__ll_os_getgid
-
-
-class VirtualizedSocketProc(VirtualizedSandboxedProc):
- """ Extends VirtualizedSandboxProc with socket
- options, ie tcp://host:port as args to os.open
- """
- def __init__(self, *args, **kwds):
- super(VirtualizedSocketProc, self).__init__(*args, **kwds)
- self.sockets = {}
-
- def do_ll_os__ll_os_open(self, name, flags, mode):
- if not name.startswith("tcp://"):
- return super(VirtualizedSocketProc, self).do_ll_os__ll_os_open(
- name, flags, mode)
- import socket
- host, port = name[6:].split(":")
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((host, int(port)))
- fd = self.allocate_fd(sock)
- self.sockets[fd] = True
- return fd
-
- def do_ll_os__ll_os_read(self, fd, size):
- if fd in self.sockets:
- return self.get_file(fd).recv(size)
- return super(VirtualizedSocketProc, self).do_ll_os__ll_os_read(
- fd, size)
-
- def do_ll_os__ll_os_write(self, fd, data):
- if fd in self.sockets:
- return self.get_file(fd).send(data)
- return super(VirtualizedSocketProc, self).do_ll_os__ll_os_write(
- fd, data)
-
diff --git a/rpython/translator/sandbox/test/test_graphchecker.py b/rpython/translator/sandbox/test/test_graphchecker.py
--- a/rpython/translator/sandbox/test/test_graphchecker.py
+++ b/rpython/translator/sandbox/test/test_graphchecker.py
@@ -52,7 +52,11 @@
return llop.force_cast(lltype.Signed, x)
self.check_safe(f, [float])
self.check_safe(f, [lltype.Ptr(SRAW)])
- self.check_unsafe("argument is a GC ptr", f, [lltype.Ptr(SGC)])
+ self.check_safe(f, [lltype.Ptr(SGC)])
+ #
+ def g(x):
+ return llop.force_cast(lltype.Ptr(SGC), x)
+ self.check_unsafe("result is a GC ptr", g, [int])
def test_direct_call_to_check_caller(self):
@sandbox_review(check_caller=True)
diff --git a/rpython/translator/sandbox/test/test_sandlib.py b/rpython/translator/sandbox/test/test_sandlib.py
deleted file mode 100644
--- a/rpython/translator/sandbox/test/test_sandlib.py
+++ /dev/null
@@ -1,267 +0,0 @@
-import py
-import errno, os, StringIO
-from rpython.tool.sourcetools import func_with_new_name
-from rpython.rtyper.lltypesystem import rffi
-from rpython.translator.sandbox.sandlib import SandboxedProc
-from rpython.translator.sandbox.sandlib import SimpleIOSandboxedProc
-from rpython.translator.sandbox.sandlib import VirtualizedSandboxedProc
-from rpython.translator.sandbox.sandlib import VirtualizedSocketProc
-from rpython.translator.sandbox.test.test_sandbox import compile
-from rpython.translator.sandbox.vfs import Dir, File, RealDir, RealFile
-
-
-class MockSandboxedProc(SandboxedProc):
- """A sandbox process wrapper that replays expected syscalls."""
-
- def __init__(self, args, expected):
- SandboxedProc.__init__(self, args)
- self.expected = expected
- self.seen = 0
-
- def _make_method(name):
- def do_xxx(self, *input):
- print "decoded from subprocess: %s%r" % (name, input)
- expectedmsg, expectedinput, output = self.expected[self.seen]
- assert name == expectedmsg
- assert input == expectedinput
- self.seen += 1
- if isinstance(output, Exception):
- raise output
- return output
- return func_with_new_name(do_xxx, 'do_%s' % name)
-
- do_ll_os__ll_os_open = _make_method("open")
- do_ll_os__ll_os_read = _make_method("read")
- do_ll_os__ll_os_write = _make_method("write")
- do_ll_os__ll_os_close = _make_method("close")
-
-
-def test_lib():
- def entry_point(argv):
- fd = os.open("/tmp/foobar", os.O_RDONLY, 0777)
- assert fd == 77
- res = os.read(fd, 123)
- assert res == "he\x00llo"
- count = os.write(fd, "world\x00!\x00")
- assert count == 42
- for arg in argv:
- count = os.write(fd, arg)
- assert count == 61
- os.close(fd)
- return 0
- exe = compile(entry_point)
-
- proc = MockSandboxedProc([exe, 'x1', 'y2'], expected = [
- ("open", ("/tmp/foobar", os.O_RDONLY, 0777), 77),
- ("read", (77, 123), "he\x00llo"),
- ("write", (77, "world\x00!\x00"), 42),
- ("write", (77, exe), 61),
- ("write", (77, "x1"), 61),
- ("write", (77, "y2"), 61),
- ("close", (77,), None),
- ])
- proc.handle_forever()
- assert proc.seen == len(proc.expected)
-
-def test_foobar():
- py.test.skip("to be updated")
- foobar = rffi.llexternal("foobar", [rffi.CCHARP], rffi.LONG)
- def entry_point(argv):
- s = rffi.str2charp(argv[1]); n = foobar(s); rffi.free_charp(s)
- s = rffi.str2charp(argv[n]); n = foobar(s); rffi.free_charp(s)
- return n
- exe = compile(entry_point)
-
- proc = MockSandboxedProc([exe, 'spam', 'egg'], expected = [
- ("foobar", ("spam",), 2),
- ("foobar", ("egg",), 0),
- ])
- proc.handle_forever()
- assert proc.seen == len(proc.expected)
-
-def test_simpleio():
- def entry_point(argv):
- print "Please enter a number:"
- buf = ""
- while True:
- t = os.read(0, 1) # 1 character from stdin
- if not t:
- raise EOFError
- if t == '\n':
- break
- buf += t
- num = int(buf)
- print "The double is:", num * 2
- return 0
- exe = compile(entry_point)
-
- proc = SimpleIOSandboxedProc([exe, 'x1', 'y2'])
- output, error = proc.communicate("21\n")
- assert output == "Please enter a number:\nThe double is: 42\n"
- assert error == ""
-
-def test_socketio():
- class SocketProc(VirtualizedSocketProc, SimpleIOSandboxedProc):
- def build_virtual_root(self):
- pass
-
- def entry_point(argv):
- fd = os.open("tcp://python.org:80", os.O_RDONLY, 0777)
- os.write(fd, 'GET /\n')
- print os.read(fd, 50)
- return 0
- exe = compile(entry_point)
-
- proc = SocketProc([exe])
- output, error = proc.communicate("")
- assert output.startswith('HTTP/1.0 400 Bad request')
-
-def test_oserror():
- def entry_point(argv):
- try:
- os.open("/tmp/foobar", os.O_RDONLY, 0777)
- except OSError as e:
- os.close(e.errno) # nonsense, just to see outside
- return 0
- exe = compile(entry_point)
-
- proc = MockSandboxedProc([exe], expected = [
- ("open", ("/tmp/foobar", os.O_RDONLY, 0777), OSError(-42, "baz")),
- ("close", (-42,), None),
- ])
- proc.handle_forever()
- assert proc.seen == len(proc.expected)
-
-
-class SandboxedProcWithFiles(VirtualizedSandboxedProc, SimpleIOSandboxedProc):
- """A sandboxed process with a simple virtualized filesystem.
-
- For testing file operations.
-
- """
- def build_virtual_root(self):
- return Dir({
- 'hi.txt': File("Hello, world!\n"),
- 'this.pyc': RealFile(__file__),
- })
-
-def test_too_many_opens():
- def entry_point(argv):
- try:
- open_files = []
- for i in range(500):
- fd = os.open('/hi.txt', os.O_RDONLY, 0777)
- open_files.append(fd)
- txt = os.read(fd, 100)
- if txt != "Hello, world!\n":
- print "Wrong content: %s" % txt
- except OSError as e:
- # We expect to get EMFILE, for opening too many files.
- if e.errno != errno.EMFILE:
- print "OSError: %s!" % (e.errno,)
- else:
- print "We opened 500 fake files! Shouldn't have been able to."
-
- for fd in open_files:
- os.close(fd)
-
- try:
- open_files = []
- for i in range(500):
- fd = os.open('/this.pyc', os.O_RDONLY, 0777)
- open_files.append(fd)
- except OSError as e:
- # We expect to get EMFILE, for opening too many files.
- if e.errno != errno.EMFILE:
- print "OSError: %s!" % (e.errno,)
- else:
- print "We opened 500 real files! Shouldn't have been able to."
-
- print "All ok!"
- return 0
- exe = compile(entry_point)
-
- proc = SandboxedProcWithFiles([exe])
- output, error = proc.communicate("")
- assert output == "All ok!\n"
- assert error == ""
-
-def test_fstat():
- def compare(a, b, i):
- if a != b:
- print "stat and fstat differ @%d: %s != %s" % (i, a, b)
-
- def entry_point(argv):
- try:
- # Open a file, and compare stat and fstat
- fd = os.open('/hi.txt', os.O_RDONLY, 0777)
- st = os.stat('/hi.txt')
- fs = os.fstat(fd)
- # RPython requires the index for stat to be a constant.. :(
- compare(st[0], fs[0], 0)
- compare(st[1], fs[1], 1)
- compare(st[2], fs[2], 2)
- compare(st[3], fs[3], 3)
- compare(st[4], fs[4], 4)
- compare(st[5], fs[5], 5)
- compare(st[6], fs[6], 6)
- compare(st[7], fs[7], 7)
- compare(st[8], fs[8], 8)
- compare(st[9], fs[9], 9)
- except OSError as e:
- print "OSError: %s" % (e.errno,)
- print "All ok!"
- return 0
- exe = compile(entry_point)
-
- proc = SandboxedProcWithFiles([exe])
- output, error = proc.communicate("")
- assert output == "All ok!\n"
- assert error == ""
-
-def test_lseek():
- def char_should_be(c, should):
- if c != should:
- print "Wrong char: '%s' should be '%s'" % (c, should)
-
- def entry_point(argv):
- fd = os.open('/hi.txt', os.O_RDONLY, 0777)
- char_should_be(os.read(fd, 1), "H")
- new = os.lseek(fd, 3, os.SEEK_CUR)
- if new != 4:
- print "Wrong offset, %d should be 4" % new
- char_should_be(os.read(fd, 1), "o")
- new = os.lseek(fd, -3, os.SEEK_END)
- if new != 11:
- print "Wrong offset, %d should be 11" % new
- char_should_be(os.read(fd, 1), "d")
- new = os.lseek(fd, 7, os.SEEK_SET)
- if new != 7:
- print "Wrong offset, %d should be 7" % new
- char_should_be(os.read(fd, 1), "w")
- print "All ok!"
- return 0
- exe = compile(entry_point)
-
- proc = SandboxedProcWithFiles([exe])
- output, error = proc.communicate("")
- assert output == "All ok!\n"
- assert error == ""
-
-def test_getuid():
- if not hasattr(os, 'getuid'):
- py.test.skip("posix only")
-
- def entry_point(argv):
- import os
- print "uid is %s" % os.getuid()
- print "euid is %s" % os.geteuid()
- print "gid is %s" % os.getgid()
- print "egid is %s" % os.getegid()
- return 0
- exe = compile(entry_point)
-
- proc = SandboxedProcWithFiles([exe])
- output, error = proc.communicate("")
- assert output == "uid is 1000\neuid is 1000\ngid is 1000\negid is 1000\n"
- assert error == ""
diff --git a/rpython/translator/sandbox/test/test_vfs.py b/rpython/translator/sandbox/test/test_vfs.py
deleted file mode 100644
--- a/rpython/translator/sandbox/test/test_vfs.py
+++ /dev/null
@@ -1,114 +0,0 @@
-import py
-import sys, stat, os
-from rpython.translator.sandbox.vfs import *
-from rpython.tool.udir import udir
-
-HASLINK = hasattr(os, 'symlink')
-
-def setup_module(mod):
- d = udir.ensure('test_vfs', dir=1)
- d.join('file1').write('somedata1')
- d.join('file2').write('somelongerdata2')
- os.chmod(str(d.join('file2')), stat.S_IWUSR) # unreadable
- d.join('.hidden').write('secret')
- d.ensure('subdir1', dir=1).join('subfile1').write('spam')
- d.ensure('.subdir2', dir=1).join('subfile2').write('secret as well')
- if HASLINK:
- d.join('symlink1').mksymlinkto(str(d.join('subdir1')))
- d.join('symlink2').mksymlinkto('.hidden')
- d.join('symlink3').mksymlinkto('BROKEN')
-
-
-def test_dir():
- d = Dir({'foo': Dir()})
- assert d.keys() == ['foo']
- py.test.raises(OSError, d.open)
- assert 0 <= d.getsize() <= sys.maxint
- d1 = d.join('foo')
- assert stat.S_ISDIR(d1.kind)
- assert d1.keys() == []
- py.test.raises(OSError, d.join, 'bar')
- st = d.stat()
- assert stat.S_ISDIR(st.st_mode)
- assert d.access(os.R_OK | os.X_OK)
- assert not d.access(os.W_OK)
-
-def test_file():
- f = File('hello world')
- assert stat.S_ISREG(f.kind)
- py.test.raises(OSError, f.keys)
- assert f.getsize() == 11
- h = f.open()
- data = h.read()
- assert data == 'hello world'
- h.close()
- st = f.stat()
- assert stat.S_ISREG(st.st_mode)
- assert st.st_size == 11
- assert f.access(os.R_OK)
- assert not f.access(os.W_OK)
-
-def test_realdir_realfile():
- for show_dotfiles in [False, True]:
- for follow_links in [False, True]:
- v_udir = RealDir(str(udir), show_dotfiles = show_dotfiles,
- follow_links = follow_links)
- v_test_vfs = v_udir.join('test_vfs')
- names = v_test_vfs.keys()
- names.sort()
- assert names == (show_dotfiles * ['.hidden', '.subdir2'] +
- ['file1', 'file2', 'subdir1'] +
- HASLINK * ['symlink1', 'symlink2', 'symlink3'])
- py.test.raises(OSError, v_test_vfs.open)
- assert 0 <= v_test_vfs.getsize() <= sys.maxint
-
- f = v_test_vfs.join('file1')
- assert f.open().read() == 'somedata1'
-
- f = v_test_vfs.join('file2')
- assert f.getsize() == len('somelongerdata2')
- if os.name != 'nt': # can't have unreadable files there?
- py.test.raises(OSError, f.open)
-
- py.test.raises(OSError, v_test_vfs.join, 'does_not_exist')
- py.test.raises(OSError, v_test_vfs.join, 'symlink3')
- if follow_links and HASLINK:
- d = v_test_vfs.join('symlink1')
- assert stat.S_ISDIR(d.stat().st_mode)
- assert d.keys() == ['subfile1']
- assert d.join('subfile1').open().read() == 'spam'
-
- f = v_test_vfs.join('symlink2')
- assert stat.S_ISREG(f.stat().st_mode)
- assert f.access(os.R_OK)
- assert f.open().read() == 'secret'
- else:
- py.test.raises(OSError, v_test_vfs.join, 'symlink1')
- py.test.raises(OSError, v_test_vfs.join, 'symlink2')
-
- if show_dotfiles:
- f = v_test_vfs.join('.hidden')
- assert f.open().read() == 'secret'
-
- d = v_test_vfs.join('.subdir2')
- assert d.keys() == ['subfile2']
- assert d.join('subfile2').open().read() == 'secret as well'
- else:
- py.test.raises(OSError, v_test_vfs.join, '.hidden')
- py.test.raises(OSError, v_test_vfs.join, '.subdir2')
-
-def test_realdir_exclude():
- xdir = udir.ensure('test_realdir_exclude', dir=1)
- xdir.ensure('test_realdir_exclude.yes')
- xdir.ensure('test_realdir_exclude.no')
- v_udir = RealDir(str(udir), exclude=['.no'])
- v_xdir = v_udir.join('test_realdir_exclude')
- assert 'test_realdir_exclude.yes' in v_xdir.keys()
- assert 'test_realdir_exclude.no' not in v_xdir.keys()
- v_xdir.join('test_realdir_exclude.yes') # works
- py.test.raises(OSError, v_xdir.join, 'test_realdir_exclude.no')
- # Windows and Mac tests, for the case
- py.test.raises(OSError, v_xdir.join, 'Test_RealDir_Exclude.no')
- py.test.raises(OSError, v_xdir.join, 'test_realdir_exclude.No')
- py.test.raises(OSError, v_xdir.join, 'test_realdir_exclude.nO')
- py.test.raises(OSError, v_xdir.join, 'test_realdir_exclude.NO')
diff --git a/rpython/translator/sandbox/vfs.py b/rpython/translator/sandbox/vfs.py
deleted file mode 100644
--- a/rpython/translator/sandbox/vfs.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import os
-import stat, errno
-
-UID = 1000
-GID = 1000
-ATIME = MTIME = CTIME = 0
-INO_COUNTER = 0
-
-
-class FSObject(object):
- read_only = True
-
- def stat(self):
- try:
- st_ino = self._st_ino
- except AttributeError:
- global INO_COUNTER
- INO_COUNTER += 1
- st_ino = self._st_ino = INO_COUNTER
- st_dev = 1
- st_nlink = 1
- st_size = self.getsize()
- st_mode = self.kind
- st_mode |= stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
- if stat.S_ISDIR(self.kind):
- st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
- if self.read_only:
- st_uid = 0 # read-only files are virtually owned by root
- st_gid = 0
- else:
- st_uid = UID # read-write files are owned by this virtual user
- st_gid = GID
- st_atime = ATIME
- st_mtime = MTIME
- st_ctime = CTIME
- return os.stat_result(
- (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid,
- st_size, st_atime, st_mtime, st_ctime))
-
- def access(self, mode):
- s = self.stat()
- e_mode = s.st_mode & stat.S_IRWXO
- if UID == s.st_uid:
- e_mode |= (s.st_mode & stat.S_IRWXU) >> 6
- if GID == s.st_gid:
- e_mode |= (s.st_mode & stat.S_IRWXG) >> 3
- return (e_mode & mode) == mode
-
- def keys(self):
- raise OSError(errno.ENOTDIR, self)
-
- def open(self):
- raise OSError(errno.EACCES, self)
-
- def getsize(self):
- return 0
-
-
-class Dir(FSObject):
- kind = stat.S_IFDIR
- def __init__(self, entries={}):
- self.entries = entries
- def keys(self):
- return self.entries.keys()
- def join(self, name):
- try:
- return self.entries[name]
- except KeyError:
- raise OSError(errno.ENOENT, name)
-
-class RealDir(Dir):
- # If show_dotfiles=False, we pretend that all files whose name starts
- # with '.' simply don't exist. If follow_links=True, then symlinks are
- # transparently followed (they look like a regular file or directory to
- # the sandboxed process). If follow_links=False, the subprocess is
- # not allowed to access them at all. Finally, exclude is a list of
- # file endings that we filter out (note that we also filter out files
- # with the same ending but a different case, to be safe).
- def __init__(self, path, show_dotfiles=False, follow_links=False,
- exclude=[]):
- self.path = path
- self.show_dotfiles = show_dotfiles
- self.follow_links = follow_links
- self.exclude = [excl.lower() for excl in exclude]
- def __repr__(self):
- return '<RealDir %s>' % (self.path,)
- def keys(self):
- names = os.listdir(self.path)
- if not self.show_dotfiles:
- names = [name for name in names if not name.startswith('.')]
- for excl in self.exclude:
- names = [name for name in names if not name.lower().endswith(excl)]
- return names
- def join(self, name):
- if name.startswith('.') and not self.show_dotfiles:
- raise OSError(errno.ENOENT, name)
- for excl in self.exclude:
- if name.lower().endswith(excl):
- raise OSError(errno.ENOENT, name)
- path = os.path.join(self.path, name)
- if self.follow_links:
- st = os.stat(path)
- else:
- st = os.lstat(path)
- if stat.S_ISDIR(st.st_mode):
- return RealDir(path, show_dotfiles = self.show_dotfiles,
- follow_links = self.follow_links,
- exclude = self.exclude)
- elif stat.S_ISREG(st.st_mode):
- return RealFile(path)
- else:
- # don't allow access to symlinks and other special files
- raise OSError(errno.EACCES, path)
-
-class File(FSObject):
- kind = stat.S_IFREG
- def __init__(self, data=''):
- self.data = data
- def getsize(self):
- return len(self.data)
- def open(self):
- import cStringIO
- return cStringIO.StringIO(self.data)
-
-class RealFile(File):
- def __init__(self, path, mode=0):
- self.path = path
- self.kind |= mode
- def __repr__(self):
- return '<RealFile %s>' % (self.path,)
- def getsize(self):
- return os.stat(self.path).st_size
- def open(self):
- try:
- return open(self.path, "rb")
- except IOError as e:
- raise OSError(e.errno, "open failed")
More information about the pypy-commit
mailing list