[pypy-commit] pypy vmprof-review: Tweaks, and implementation of a wait-free, best-effort algorithm to

arigo noreply at buildbot.pypy.org
Mon Aug 3 19:26:07 CEST 2015


Author: Armin Rigo <arigo at tunes.org>
Branch: vmprof-review
Changeset: r78755:f99230e64cbc
Date: 2015-08-03 19:26 +0200
http://bitbucket.org/pypy/pypy/changeset/f99230e64cbc/

Log:	Tweaks, and implementation of a wait-free, best-effort algorithm to
	handle multiple threads (some possibly in signals) that all try to
	write chunks of data to the same file

diff --git a/rpython/rlib/rvmprof/cintf.py b/rpython/rlib/rvmprof/cintf.py
--- a/rpython/rlib/rvmprof/cintf.py
+++ b/rpython/rlib/rvmprof/cintf.py
@@ -35,6 +35,9 @@
 vmprof_disable = rffi.llexternal("rpython_vmprof_disable", [], rffi.INT,
                                  compilation_info=eci,
                                  save_err=rffi.RFFI_SAVE_ERRNO)
+vmprof_write_buf = rffi.llexternal("rpython_vmprof_write_buf",
+                                   [rffi.CCHARP, rffi.LONG],
+                                   lltype.Void, compilation_info=eci)
 
 ## vmprof_register_virtual_function = rffi.llexternal(
 ##     "vmprof_register_virtual_function",
diff --git a/rpython/rlib/rvmprof/rvmprof.py b/rpython/rlib/rvmprof/rvmprof.py
--- a/rpython/rlib/rvmprof/rvmprof.py
+++ b/rpython/rlib/rvmprof/rvmprof.py
@@ -8,6 +8,7 @@
 from rpython.rtyper.lltypesystem import rffi
 
 MAX_CODES = 8000
+MAX_FUNC_NAME = 128
 
 # ____________________________________________________________
 
@@ -29,7 +30,6 @@
 
     def _cleanup_(self):
         self.is_enabled = False
-        self.ever_enabled = False
         self.fileno = -1
         self._current_codes = None
         if sys.maxint == 2147483647:
@@ -93,15 +93,13 @@
         if not (1e-6 <= interval < 1.0):
             raise VMProfError("bad value for 'interval'")
         interval_usec = int(interval * 1000000.0)
-        #
+
+        p_error = cintf.vmprof_init()
+        if p_error:
+            raise VMProfError(rffi.charp2str(p_error))
+
         self.fileno = fileno
         self._write_header(interval_usec)
-        if not self.ever_enabled:
-            if we_are_translated():
-                p_error = cintf.vmprof_init()
-                if p_error:
-                    raise VMProfError(rffi.charp2str(p_error))
-            self.ever_enabled = True
         self._gather_all_code_objs()
         if we_are_translated():
             # does not work untranslated
@@ -126,6 +124,8 @@
                 raise VMProfError(os.strerror(rposix.get_saved_errno()))
 
     def _write_code_registration(self, uid, name):
+        if len(name) > MAX_FUNC_NAME:
+            name = name[:MAX_FUNC_NAME]
         b = self._current_codes
         if b is None:
             b = self._current_codes = StringBuilder()
@@ -139,20 +139,7 @@
     def _flush_codes(self):
         buf = self._current_codes.build()
         self._current_codes = None
-        self._carefully_write(buf)
-
-    def _carefully_write(self, buf):
-        fd = self.fileno
-        assert fd >= 0
-        if not buf:
-            return
-        cintf.vmprof_ignore_signals(True)
-        try:
-            while len(buf) > 0:
-                num = os.write(fd, buf)
-                buf = buf[num:]
-        finally:
-            cintf.vmprof_ignore_signals(False)
+        cintf.vmprof_write_buf(buf, len(buf))
 
     def _write_header(self, interval_usec):
         b = StringBuilder()
@@ -164,7 +151,8 @@
         b.append('\x04') # interp name
         b.append(chr(len('pypy')))
         b.append('pypy')
-        self._carefully_write(b.build())
+        buf = b.build()
+        cintf.vmprof_write_buf(buf, len(buf))
 
 
 def _write_long_to_string_builder(l, b):
diff --git a/rpython/rlib/rvmprof/src/rvmprof.c b/rpython/rlib/rvmprof/src/rvmprof.c
--- a/rpython/rlib/rvmprof/src/rvmprof.c
+++ b/rpython/rlib/rvmprof/src/rvmprof.c
@@ -1,3 +1,22 @@
+/* VMPROF
+ *
+ * statistical sampling profiler specifically designed to profile programs
+ * which run on a Virtual Machine and/or bytecode interpreter, such as Python,
+ * etc.
+ *
+ * The logic to dump the C stack traces is partly stolen from the code in
+ * gperftools.
+ * The file "getpc.h" has been entirely copied from gperftools.
+ *
+ * Tested only on gcc, linux, x86_64.
+ *
+ * Copyright (C) 2014-2015
+ *   Antonio Cuni - anto.cuni at gmail.com
+ *   Maciej Fijalkowski - fijall at gmail.com
+ *   Armin Rigo - arigo at tunes.org
+ *
+ */
+
 #define _GNU_SOURCE 1
 
 
@@ -16,8 +35,6 @@
 #endif
 
 
-#include "rvmprof_getpc.h"
-#include "rvmprof_base.h"
 #include <dlfcn.h>
 #include <assert.h>
 #include <pthread.h>
@@ -28,6 +45,9 @@
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
+#include "rvmprof_getpc.h"
+#include "rvmprof_unwind.h"
+#include "rvmprof_mt.h"
 
 
 /************************************************************/
@@ -57,6 +77,8 @@
         if (!(unw_step = dlsym(libhandle, "_ULx86_64_step")))
             goto error;
     }
+    if (prepare_concurrent_bufs() < 0)
+        return "out of memory";
     return NULL;
 
  error:
@@ -86,6 +108,9 @@
  * *************************************************************
  */
 
+#define MAX_FUNC_NAME 128
+#define MAX_STACK_DEPTH ((SINGLE_BUF_SIZE / sizeof(void *)) - 4)
+
 #define MARKER_STACKTRACE '\x01'
 #define MARKER_VIRTUAL_IP '\x02'
 #define MARKER_TRAILER '\x03'
@@ -94,32 +119,26 @@
 static long profile_interval_usec = 0;
 static char atfork_hook_installed = 0;
 
-static int _write_all(const void *buf, size_t bufsize)
-{
-    while (bufsize > 0) {
-        ssize_t count = write(profile_file, buf, bufsize);
-        if (count <= 0)
-            return -1;   /* failed */
-        buf += count;
-        bufsize -= count;
-    }
-    return 0;
-}
 
 static void sigprof_handler(int sig_nr, siginfo_t* info, void *ucontext) {
+    if (ignore_signals)
+        return;
     int saved_errno = errno;
-    /*
+#if 0
     void* stack[MAX_STACK_DEPTH];
     stack[0] = GetPC((ucontext_t*)ucontext);
-    int depth = frame_forcer(get_stack_trace(stack+1, MAX_STACK_DEPTH-1, ucontext));
+    int depth = get_stack_trace(stack+1, MAX_STACK_DEPTH-1, ucontext);
     depth++;  // To account for pc value in stack[0];
     prof_write_stacktrace(stack, depth, 1);
-    */
+#endif
     errno = saved_errno;
 }
 
 
-/************************************************************/
+/* *************************************************************
+ * the setup and teardown functions
+ * *************************************************************
+ */
 
 static int install_sigprof_handler(void)
 {
@@ -218,6 +237,18 @@
     return -1;
 }
 
+static int _write_all(const void *buf, size_t bufsize)
+{
+    while (bufsize > 0) {
+        ssize_t count = write(profile_file, buf, bufsize);
+        if (count <= 0)
+            return -1;   /* failed */
+        buf += count;
+        bufsize -= count;
+    }
+    return 0;
+}
+
 static int close_profile(void)
 {
     int srcfd;
@@ -272,5 +303,19 @@
         return -1;
     if (remove_sigprof_handler() == -1)
         return -1;
+    shutdown_concurrent_bufs(profile_file);
     return close_profile();
 }
+
+RPY_EXTERN
+void rpython_vmprof_write_buf(char *buf, long size)
+{
+    struct profbuf_s *p = reserve_buffer(profile_file);
+
+    if (size > SINGLE_BUF_SIZE)
+        size = SINGLE_BUF_SIZE;
+    memcpy(p->data, buf, size);
+    p->data_size = size;
+
+    commit_buffer(profile_file, p);
+}
diff --git a/rpython/rlib/rvmprof/src/rvmprof.h b/rpython/rlib/rvmprof/src/rvmprof.h
--- a/rpython/rlib/rvmprof/src/rvmprof.h
+++ b/rpython/rlib/rvmprof/src/rvmprof.h
@@ -1,5 +1,6 @@
 
 RPY_EXTERN char *rpython_vmprof_init(void);
-RPY_EXTERN void vmprof_ignore_signals(int);
+RPY_EXTERN void rpython_vmprof_ignore_signals(int);
 RPY_EXTERN int rpython_vmprof_enable(int, long);
 RPY_EXTERN int rpython_vmprof_disable(void);
+RPY_EXTERN void rpython_vmprof_write_buf(char *, long);
diff --git a/rpython/rlib/rvmprof/src/rvmprof_mt.h b/rpython/rlib/rvmprof/src/rvmprof_mt.h
new file mode 100644
--- /dev/null
+++ b/rpython/rlib/rvmprof/src/rvmprof_mt.h
@@ -0,0 +1,156 @@
+/* Support for multithreaded write() operations */
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <string.h>
+
+#define SINGLE_BUF_SIZE  (8192 - 2 * sizeof(unsigned int))
+#define MAX_NUM_BUFFERS  32
+
+#if defined(__i386__) || defined(__amd64__)
+  static inline void write_fence(void) { asm("" : : : "memory"); }
+#else
+  static inline void write_fence(void) { __sync_synchronize(); }
+#endif
+
+
+#define PROFBUF_UNUSED   0
+#define PROFBUF_FILLING  1
+#define PROFBUF_READY    2
+
+
+struct profbuf_s {
+    unsigned int data_size;
+    unsigned int data_offset;
+    char data[SINGLE_BUF_SIZE];
+};
+
+static char volatile profbuf_state[MAX_NUM_BUFFERS];
+static struct profbuf_s *profbuf_all_buffers = NULL;
+static int volatile profbuf_write_lock = 2;
+
+
+static int prepare_concurrent_bufs(void)
+{
+    assert(sizeof(struct profbuf_s) == 8192);
+
+    if (profbuf_all_buffers != NULL) {
+        munmap(profbuf_all_buffers, sizeof(struct profbuf_s) * MAX_NUM_BUFFERS);
+        profbuf_all_buffers = NULL;
+    }
+    profbuf_all_buffers = mmap(NULL, sizeof(struct profbuf_s) * MAX_NUM_BUFFERS,
+                               PROT_READ | PROT_WRITE,
+                               MAP_PRIVATE | MAP_ANONYMOUS,
+                               -1, 0);
+    if (profbuf_all_buffers == MAP_FAILED) {
+        profbuf_all_buffers = NULL;
+        return -1;
+    }
+    memset((char *)profbuf_state, PROFBUF_UNUSED, sizeof(profbuf_state));
+    profbuf_write_lock = 0;
+    return 0;
+}
+
+static void _write_single_ready_buffer(int fd, long i)
+{
+    struct profbuf_s *p = &profbuf_all_buffers[i];
+    ssize_t count = write(fd, p->data + p->data_offset, p->data_size);
+    if (count == p->data_size) {
+        profbuf_state[i] = PROFBUF_UNUSED;
+    }
+    else if (count > 0) {
+        p->data_offset += count;
+        p->data_size -= count;
+    }
+}
+
+static void _write_ready_buffers(int fd)
+{
+    long i;
+    int has_write_lock = 0;
+
+    for (i = 0; i < MAX_NUM_BUFFERS; i++) {
+        if (profbuf_state[i] == PROFBUF_READY) {
+            if (!has_write_lock) {
+                if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 1))
+                    return;   /* can't acquire the write lock, give up */
+                has_write_lock = 1;
+            }
+            _write_single_ready_buffer(fd, i);
+        }
+    }
+    if (has_write_lock)
+        profbuf_write_lock = 0;
+}
+
+static struct profbuf_s *reserve_buffer(int fd)
+{
+    /* Tries to enter a region of code that fills one buffer.  If
+       successful, returns the profbuf_s.  It fails only if the
+       concurrent buffers are all busy (extreme multithreaded usage).
+
+       This might call write() to emit the data sitting in
+       previously-prepared buffers.  In case of write() error, the
+       error is ignored but unwritten data stays in the buffers.
+    */
+    long i;
+
+    _write_ready_buffers(fd);
+
+    for (i = 0; i < MAX_NUM_BUFFERS; i++) {
+        if (profbuf_state[i] == PROFBUF_UNUSED &&
+            __sync_bool_compare_and_swap(&profbuf_state[i], PROFBUF_UNUSED,
+                                         PROFBUF_FILLING)) {
+            struct profbuf_s *p = &profbuf_all_buffers[i];
+            p->data_size = 0;
+            p->data_offset = 0;
+            return p;
+        }
+    }
+    /* no unused buffer found */
+    return NULL;
+}
+
+static void commit_buffer(int fd, struct profbuf_s *buf)
+{
+    /* Leaves a region of code that filled 'buf'.
+
+       This might call write() to emit the data now ready.  In case of
+       write() error, the error is ignored but unwritten data stays in
+       the buffers.
+    */
+
+    /* Make sure every thread sees the full content of 'buf' */
+    write_fence();
+
+    /* Then set the 'ready' flag */
+    long i = buf - profbuf_all_buffers;
+    assert(profbuf_state[i] == PROFBUF_FILLING);
+    profbuf_state[i] = PROFBUF_READY;
+
+    if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 1)) {
+        /* can't acquire the write lock, ignore */
+    }
+    else {
+        _write_single_ready_buffer(fd, i);
+        profbuf_write_lock = 0;
+    }
+}
+
+static void shutdown_concurrent_bufs(int fd)
+{
+ retry:
+    usleep(1);
+    if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 2)) {
+        /* spin loop */
+        goto retry;
+    }
+
+    /* last attempt to flush buffers */
+    int i;
+    for (i = 0; i < MAX_NUM_BUFFERS; i++) {
+        if (profbuf_state[i] == PROFBUF_READY) {
+            _write_single_ready_buffer(fd, i);
+        }
+    }
+}
diff --git a/rpython/rlib/rvmprof/src/rvmprof_base.h b/rpython/rlib/rvmprof/src/rvmprof_unwind.h
rename from rpython/rlib/rvmprof/src/rvmprof_base.h
rename to rpython/rlib/rvmprof/src/rvmprof_unwind.h


More information about the pypy-commit mailing list