[pypy-commit] pypy vmprof-review: Tweaks, and implementation of a wait-free, best-effort algorithm to
arigo
noreply at buildbot.pypy.org
Mon Aug 3 19:26:07 CEST 2015
Author: Armin Rigo <arigo at tunes.org>
Branch: vmprof-review
Changeset: r78755:f99230e64cbc
Date: 2015-08-03 19:26 +0200
http://bitbucket.org/pypy/pypy/changeset/f99230e64cbc/
Log: Tweaks, and implementation of a wait-free, best-effort algorithm to
handle multiple threads (some possibly in signals) that all try to
write chunks of data to the same file
diff --git a/rpython/rlib/rvmprof/cintf.py b/rpython/rlib/rvmprof/cintf.py
--- a/rpython/rlib/rvmprof/cintf.py
+++ b/rpython/rlib/rvmprof/cintf.py
@@ -35,6 +35,9 @@
vmprof_disable = rffi.llexternal("rpython_vmprof_disable", [], rffi.INT,
compilation_info=eci,
save_err=rffi.RFFI_SAVE_ERRNO)
+vmprof_write_buf = rffi.llexternal("rpython_vmprof_write_buf",
+ [rffi.CCHARP, rffi.LONG],
+ lltype.Void, compilation_info=eci)
## vmprof_register_virtual_function = rffi.llexternal(
## "vmprof_register_virtual_function",
diff --git a/rpython/rlib/rvmprof/rvmprof.py b/rpython/rlib/rvmprof/rvmprof.py
--- a/rpython/rlib/rvmprof/rvmprof.py
+++ b/rpython/rlib/rvmprof/rvmprof.py
@@ -8,6 +8,7 @@
from rpython.rtyper.lltypesystem import rffi
MAX_CODES = 8000
+MAX_FUNC_NAME = 128
# ____________________________________________________________
@@ -29,7 +30,6 @@
def _cleanup_(self):
self.is_enabled = False
- self.ever_enabled = False
self.fileno = -1
self._current_codes = None
if sys.maxint == 2147483647:
@@ -93,15 +93,13 @@
if not (1e-6 <= interval < 1.0):
raise VMProfError("bad value for 'interval'")
interval_usec = int(interval * 1000000.0)
- #
+
+ p_error = cintf.vmprof_init()
+ if p_error:
+ raise VMProfError(rffi.charp2str(p_error))
+
self.fileno = fileno
self._write_header(interval_usec)
- if not self.ever_enabled:
- if we_are_translated():
- p_error = cintf.vmprof_init()
- if p_error:
- raise VMProfError(rffi.charp2str(p_error))
- self.ever_enabled = True
self._gather_all_code_objs()
if we_are_translated():
# does not work untranslated
@@ -126,6 +124,8 @@
raise VMProfError(os.strerror(rposix.get_saved_errno()))
def _write_code_registration(self, uid, name):
+ if len(name) > MAX_FUNC_NAME:
+ name = name[:MAX_FUNC_NAME]
b = self._current_codes
if b is None:
b = self._current_codes = StringBuilder()
@@ -139,20 +139,7 @@
def _flush_codes(self):
buf = self._current_codes.build()
self._current_codes = None
- self._carefully_write(buf)
-
- def _carefully_write(self, buf):
- fd = self.fileno
- assert fd >= 0
- if not buf:
- return
- cintf.vmprof_ignore_signals(True)
- try:
- while len(buf) > 0:
- num = os.write(fd, buf)
- buf = buf[num:]
- finally:
- cintf.vmprof_ignore_signals(False)
+ cintf.vmprof_write_buf(buf, len(buf))
def _write_header(self, interval_usec):
b = StringBuilder()
@@ -164,7 +151,8 @@
b.append('\x04') # interp name
b.append(chr(len('pypy')))
b.append('pypy')
- self._carefully_write(b.build())
+ buf = b.build()
+ cintf.vmprof_write_buf(buf, len(buf))
def _write_long_to_string_builder(l, b):
diff --git a/rpython/rlib/rvmprof/src/rvmprof.c b/rpython/rlib/rvmprof/src/rvmprof.c
--- a/rpython/rlib/rvmprof/src/rvmprof.c
+++ b/rpython/rlib/rvmprof/src/rvmprof.c
@@ -1,3 +1,22 @@
+/* VMPROF
+ *
+ * statistical sampling profiler specifically designed to profile programs
+ * which run on a Virtual Machine and/or bytecode interpreter, such as Python,
+ * etc.
+ *
+ * The logic to dump the C stack traces is partly stolen from the code in
+ * gperftools.
+ * The file "getpc.h" has been entirely copied from gperftools.
+ *
+ * Tested only on gcc, linux, x86_64.
+ *
+ * Copyright (C) 2014-2015
+ * Antonio Cuni - anto.cuni at gmail.com
+ * Maciej Fijalkowski - fijall at gmail.com
+ * Armin Rigo - arigo at tunes.org
+ *
+ */
+
#define _GNU_SOURCE 1
@@ -16,8 +35,6 @@
#endif
-#include "rvmprof_getpc.h"
-#include "rvmprof_base.h"
#include <dlfcn.h>
#include <assert.h>
#include <pthread.h>
@@ -28,6 +45,9 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
+#include "rvmprof_getpc.h"
+#include "rvmprof_unwind.h"
+#include "rvmprof_mt.h"
/************************************************************/
@@ -57,6 +77,8 @@
if (!(unw_step = dlsym(libhandle, "_ULx86_64_step")))
goto error;
}
+ if (prepare_concurrent_bufs() < 0)
+ return "out of memory";
return NULL;
error:
@@ -86,6 +108,9 @@
* *************************************************************
*/
+#define MAX_FUNC_NAME 128
+#define MAX_STACK_DEPTH ((SINGLE_BUF_SIZE / sizeof(void *)) - 4)
+
#define MARKER_STACKTRACE '\x01'
#define MARKER_VIRTUAL_IP '\x02'
#define MARKER_TRAILER '\x03'
@@ -94,32 +119,26 @@
static long profile_interval_usec = 0;
static char atfork_hook_installed = 0;
-static int _write_all(const void *buf, size_t bufsize)
-{
- while (bufsize > 0) {
- ssize_t count = write(profile_file, buf, bufsize);
- if (count <= 0)
- return -1; /* failed */
- buf += count;
- bufsize -= count;
- }
- return 0;
-}
static void sigprof_handler(int sig_nr, siginfo_t* info, void *ucontext) {
+ if (ignore_signals)
+ return;
int saved_errno = errno;
- /*
+#if 0
void* stack[MAX_STACK_DEPTH];
stack[0] = GetPC((ucontext_t*)ucontext);
- int depth = frame_forcer(get_stack_trace(stack+1, MAX_STACK_DEPTH-1, ucontext));
+ int depth = get_stack_trace(stack+1, MAX_STACK_DEPTH-1, ucontext);
depth++; // To account for pc value in stack[0];
prof_write_stacktrace(stack, depth, 1);
- */
+#endif
errno = saved_errno;
}
-/************************************************************/
+/* *************************************************************
+ * the setup and teardown functions
+ * *************************************************************
+ */
static int install_sigprof_handler(void)
{
@@ -218,6 +237,18 @@
return -1;
}
+static int _write_all(const void *buf, size_t bufsize)
+{
+ while (bufsize > 0) {
+ ssize_t count = write(profile_file, buf, bufsize);
+ if (count <= 0)
+ return -1; /* failed */
+ buf += count;
+ bufsize -= count;
+ }
+ return 0;
+}
+
static int close_profile(void)
{
int srcfd;
@@ -272,5 +303,19 @@
return -1;
if (remove_sigprof_handler() == -1)
return -1;
+ shutdown_concurrent_bufs(profile_file);
return close_profile();
}
+
+RPY_EXTERN
+void rpython_vmprof_write_buf(char *buf, long size)
+{
+ struct profbuf_s *p = reserve_buffer(profile_file);
+
+ if (size > SINGLE_BUF_SIZE)
+ size = SINGLE_BUF_SIZE;
+ memcpy(p->data, buf, size);
+ p->data_size = size;
+
+ commit_buffer(profile_file, p);
+}
diff --git a/rpython/rlib/rvmprof/src/rvmprof.h b/rpython/rlib/rvmprof/src/rvmprof.h
--- a/rpython/rlib/rvmprof/src/rvmprof.h
+++ b/rpython/rlib/rvmprof/src/rvmprof.h
@@ -1,5 +1,6 @@
RPY_EXTERN char *rpython_vmprof_init(void);
-RPY_EXTERN void vmprof_ignore_signals(int);
+RPY_EXTERN void rpython_vmprof_ignore_signals(int);
RPY_EXTERN int rpython_vmprof_enable(int, long);
RPY_EXTERN int rpython_vmprof_disable(void);
+RPY_EXTERN void rpython_vmprof_write_buf(char *, long);
diff --git a/rpython/rlib/rvmprof/src/rvmprof_mt.h b/rpython/rlib/rvmprof/src/rvmprof_mt.h
new file mode 100644
--- /dev/null
+++ b/rpython/rlib/rvmprof/src/rvmprof_mt.h
@@ -0,0 +1,156 @@
+/* Support for multithreaded write() operations */
+
+#include <unistd.h>
+#include <sys/mman.h>
+#include <string.h>
+
+#define SINGLE_BUF_SIZE (8192 - 2 * sizeof(unsigned int))
+#define MAX_NUM_BUFFERS 32
+
+#if defined(__i386__) || defined(__amd64__)
+ static inline void write_fence(void) { asm("" : : : "memory"); }
+#else
+ static inline void write_fence(void) { __sync_synchronize(); }
+#endif
+
+
+#define PROFBUF_UNUSED 0
+#define PROFBUF_FILLING 1
+#define PROFBUF_READY 2
+
+
+struct profbuf_s {
+ unsigned int data_size;
+ unsigned int data_offset;
+ char data[SINGLE_BUF_SIZE];
+};
+
+static char volatile profbuf_state[MAX_NUM_BUFFERS];
+static struct profbuf_s *profbuf_all_buffers = NULL;
+static int volatile profbuf_write_lock = 2;
+
+
+static int prepare_concurrent_bufs(void)
+{
+ assert(sizeof(struct profbuf_s) == 8192);
+
+ if (profbuf_all_buffers != NULL) {
+ munmap(profbuf_all_buffers, sizeof(struct profbuf_s) * MAX_NUM_BUFFERS);
+ profbuf_all_buffers = NULL;
+ }
+ profbuf_all_buffers = mmap(NULL, sizeof(struct profbuf_s) * MAX_NUM_BUFFERS,
+ PROT_READ | PROT_WRITE,
+ MAP_PRIVATE | MAP_ANONYMOUS,
+ -1, 0);
+ if (profbuf_all_buffers == MAP_FAILED) {
+ profbuf_all_buffers = NULL;
+ return -1;
+ }
+ memset((char *)profbuf_state, PROFBUF_UNUSED, sizeof(profbuf_state));
+ profbuf_write_lock = 0;
+ return 0;
+}
+
+static void _write_single_ready_buffer(int fd, long i)
+{
+ struct profbuf_s *p = &profbuf_all_buffers[i];
+ ssize_t count = write(fd, p->data + p->data_offset, p->data_size);
+ if (count == p->data_size) {
+ profbuf_state[i] = PROFBUF_UNUSED;
+ }
+ else if (count > 0) {
+ p->data_offset += count;
+ p->data_size -= count;
+ }
+}
+
+static void _write_ready_buffers(int fd)
+{
+ long i;
+ int has_write_lock = 0;
+
+ for (i = 0; i < MAX_NUM_BUFFERS; i++) {
+ if (profbuf_state[i] == PROFBUF_READY) {
+ if (!has_write_lock) {
+ if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 1))
+ return; /* can't acquire the write lock, give up */
+ has_write_lock = 1;
+ }
+ _write_single_ready_buffer(fd, i);
+ }
+ }
+ if (has_write_lock)
+ profbuf_write_lock = 0;
+}
+
+static struct profbuf_s *reserve_buffer(int fd)
+{
+ /* Tries to enter a region of code that fills one buffer. If
+ successful, returns the profbuf_s. It fails only if the
+ concurrent buffers are all busy (extreme multithreaded usage).
+
+ This might call write() to emit the data sitting in
+ previously-prepared buffers. In case of write() error, the
+ error is ignored but unwritten data stays in the buffers.
+ */
+ long i;
+
+ _write_ready_buffers(fd);
+
+ for (i = 0; i < MAX_NUM_BUFFERS; i++) {
+ if (profbuf_state[i] == PROFBUF_UNUSED &&
+ __sync_bool_compare_and_swap(&profbuf_state[i], PROFBUF_UNUSED,
+ PROFBUF_FILLING)) {
+ struct profbuf_s *p = &profbuf_all_buffers[i];
+ p->data_size = 0;
+ p->data_offset = 0;
+ return p;
+ }
+ }
+ /* no unused buffer found */
+ return NULL;
+}
+
+static void commit_buffer(int fd, struct profbuf_s *buf)
+{
+ /* Leaves a region of code that filled 'buf'.
+
+ This might call write() to emit the data now ready. In case of
+ write() error, the error is ignored but unwritten data stays in
+ the buffers.
+ */
+
+ /* Make sure every thread sees the full content of 'buf' */
+ write_fence();
+
+ /* Then set the 'ready' flag */
+ long i = buf - profbuf_all_buffers;
+ assert(profbuf_state[i] == PROFBUF_FILLING);
+ profbuf_state[i] = PROFBUF_READY;
+
+ if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 1)) {
+ /* can't acquire the write lock, ignore */
+ }
+ else {
+ _write_single_ready_buffer(fd, i);
+ profbuf_write_lock = 0;
+ }
+}
+
+static void shutdown_concurrent_bufs(int fd)
+{
+ retry:
+ usleep(1);
+ if (!__sync_bool_compare_and_swap(&profbuf_write_lock, 0, 2)) {
+ /* spin loop */
+ goto retry;
+ }
+
+ /* last attempt to flush buffers */
+ int i;
+ for (i = 0; i < MAX_NUM_BUFFERS; i++) {
+ if (profbuf_state[i] == PROFBUF_READY) {
+ _write_single_ready_buffer(fd, i);
+ }
+ }
+}
diff --git a/rpython/rlib/rvmprof/src/rvmprof_base.h b/rpython/rlib/rvmprof/src/rvmprof_unwind.h
rename from rpython/rlib/rvmprof/src/rvmprof_base.h
rename to rpython/rlib/rvmprof/src/rvmprof_unwind.h
More information about the pypy-commit
mailing list