[C++-sig] boost python wrapping boost message queue problem

Holger Joukl Holger.Joukl at LBBW.de
Fri Jan 9 09:40:07 CET 2015


Hi,

> I wrapped the boost ipc message queue into Python using Boost.Python.
> Everything goes fine except I found that
>
> 1. the try_receive / try_send will block python when the mq is empty /
full.
> 2. when I use the wrapped message queue in one Python thread, any
> other Python thread would be blocked and won't execute.

I don't know about the boost ipc message queue but my guess is try_receive
(...) and
try_send(...) are blocking calls(?) and you'll need to allow Python threads
to
run while blocking.

I.e. you'd need to release Python's global interpreter lock (GIL) before
the blocking call and reacquire it when the call returns by using the
Py_BEGIN_ALLOW_THREADS / Py_END_ALLOW_THREADS macros.

Looks to me like you could do so in your message_queue class's methods,
something similar to

boost::python::str message_queue::try_receive(size_t m_size) {
  char* buffer = new char[m_size];
  bi::message_queue::size_type recvd_size;
  unsigned int priority;
  Py_BEGIN_ALLOW_THREADS;
  p_message_queue->try_receive(buffer, m_size, recvd_size, priority);
  Py_END_ALLOW_THREADS;
  return boost::python::str(buffer, m_size);
}

(fully untested code!)

Note that there might be additional intricacies with regard to waking up
the
blocking call when a message arrives in the queue and/or signal handling,
e.g. making
the program interruptible while in the blocking call.
This depends on the queue implementation and your requirements (play well
with
signal handlers registered on the Python side, ....)

The snippets below are samples from our wrapping of a message queue of the
commercial
messaging middleware TIB/Rendezvous - it's old code and could be improved
but works for us and might just give you an idea plus some documentation.

Hope it helps,
Holger


// file helpers/debug_config.hpp
#ifndef DEBUG_CONFIG_HPP
#define DEBUG_CONFIG_HPP

// __GNUC__ is defined for both C and C++
#if defined(__GNUC__)
#define __PRETTY_FUNCTION__ __PRETTY_FUNCTION__
#elif (defined(__SUNPRO_C) || defined(__SUNPRO_CC))
#define __PRETTY_FUNCTION__ __func__
#else
#define __PRETTY_FUNCTION__ __func__
#endif // if defined(__GNUC__)

#include <iostream>


// See proposed solution by Graham Dumpleton here for debug macros:
// http://www.velocityreviews.com/forums/t280342-debugging-macros.html
// IMHO this has 2 advantages:
// - integrates much nicer with code formatting than #ifdef DEBUG ...
#endif
// - gets compiled even if DEBUG is unset so the debug code gets checked
//   by the compiler (and optimized away, then, hopefully :)
//FIXME: Does this *really* get optimized away? For gcc (4.6.1) the unused
//FIXME: output code is left out (checked using the strings cmd), but for
//FIXME: Solaris studio i can grep the debug code in the resulting object
//FIXME: file in a simple example program... Must we care?


// DEBUG switches on all debugging output
#ifdef DEBUG
#define DEBUG_TRACE
#define DEBUG_FUNC
#define DEBUG_CTOR
#define DEBUG_DTOR
#define DEBUG_MODULES
#endif


// use these switches to switch on specific debug tracing
#ifdef DEBUG_TRACE
#define DEBUGTRACER_TRACE if (0) ; else std::cout
#define DEBUGTRACER_CONDITIONAL if (0) ; else
#else
#define DEBUGTRACER_TRACE if (1) ; else std::cout
#define DEBUGTRACER_CONDITIONAL if (1) ; else
#endif

#ifdef DEBUG_FUNC
#define DEBUGTRACER_FUNC if (0) ; else std::cout
#else
#define DEBUGTRACER_FUNC if (1) ; else std::cout
#endif

#ifdef DEBUG_CTOR
#define DEBUGTRACER_CTOR if (0) ; else std::cout
#else
#define DEBUGTRACER_CTOR if (1) ; else std::cout
#endif

#ifdef DEBUG_DTOR
#define DEBUGTRACER_DTOR if (0) ; else std::cout
#else
#define DEBUGTRACER_DTOR if (1) ; else std::cout
#endif

#ifdef DEBUG_MODULES
#define DEBUGTRACER_MODULES if (0) ; else std::cout
#else
#define DEBUGTRACER_MODULES if (1) ; else std::cout
#endif

#endif //DEBUG_CONFIG_HPP




// file tibrvqueue.hpp
// See tibrvque.cpp for extensive background documentation

#ifndef _ThreadedTibrvQueue_hpp
#define _ThreadedTibrvQueue_hpp

#include <tibrvcpp.h>
#include <cassert>
#include <list>
#include <signal.h>

#include "helpers/debug_config.hpp"

class ThreadedTibrvQueue : public TibrvQueue
{
    public:
        ThreadedTibrvQueue();
        void waitEvent();
        bool waitEvent(double timeout);

        TibrvStatus destroy();
        TibrvStatus destroy(TibrvQueueOnComplete* completeCB, const void*
closure = NULL);

    private:
        static void _myHook(tibrvQueue eventQueue, void* closure);
        static void sighandler(int);
        static void installSignalHandlers();
        static void removeSignalHandlers();
    private:
        pthread_cond_t m_cond;
        pthread_mutex_t m_mutex;
        typedef std::list<ThreadedTibrvQueue*> instance_list;
        static instance_list m_instances;
        static pthread_mutex_t m_instances_mutex;
        static int m_waiting;
        typedef void (*sighandler_t)(int);
        static sighandler_t m_sighandler[NSIG];
};

#endif



// file tibrvqueue.cpp
// ThreadedTibrvQueue: A TibrvQueue subclass to enable multithreaded python
// programs with Rendezvous
//
// As the TIB/Rv library does not know anything about Python, its
// functions/methods do of course not release the python GIL, even in
situations
// where this would be desired or even essential. E.g.
//  * TibrvQueue::dispatch()
//  * TibrvQueue::timedDispatch()
// won't release the GIL prior to blocking (either endlessly or for a set
interval).
//
// This means during waiting for an TIB/Rv event (incoming UDP message
(which is
// also I/O), timer event, I/O Event (platform-dependant)) no other thread
will
// be allowed to run, as the thread that called dispatch/timedDispatch
(usually
// the main thread in its main loop) holds the GIL, blocking other threads
from
// doing their workings.
// Therefore, it is essential to release the GIL in such situations.
//
// The ThreadedTibrvQueue solves this by introducing
//  * the waitEvent() method which
//    1. installs a signal handler to catch "break" signals
//    2. releases the GIL
//    3. starts waiting for the condition member variable m_cond (i.e. goes
to
//       sleep)
//    and
//  * a hook callback method _myHook() (which gets called by the TIB/Rv
event
//    mechanisms whenever an event is put into the queue) that signals the
//    condition variable m_cond. This notifies the waiting processing in
//    waitEvent() to continue running, which will then remove the signal
handler
//    and reacquire the GIL.
// The signal handler is needed to make the program interruptible e.g. by
// ctrl-c while waiting for an event to be put in the queue; it will also
signal
// the m_cond condition variable to allow waitEvent() to proceed.
//
// Used together with the normal dispatch/timedDispatch methods this allows
for
// running multithreaded programs. You'll have to use this in combination,
i.e.
// the dispatch call must be preceded by a waitEvent() call so your main Rv
loop
// implementation will basically look like this:
//
// queue = ThreadedTibrvQueue()
// while 1:
//     queue.waitEvent()
//     queue.dispatch()
//
// Possible improvements:
//  * make waitEvent() a private method and override the
//    dispatch()/timedDispatch() methods (poll() also?) to call the private
//    waitEvent(). This would make for a better API interface.
//  * unify ThreadedTibrvQueue constructor behaviour with standard
TibrvQueue
//    behaviour: ThreadedTibrvQueue performs a create() call in the
constructor
//    while TibrvQueue does not and you have to to this in a second step
//
// Alternatives:
// It is essentially possible to run multithreaded programs with the
standard
// Tibrv queue if you forgo using blocking dispatch; however, caution is
// recommended. Imagine you try to run a threaded program with another
running
// thread using a main thread loop like
//
// sys.setcheckinterval(1) # minimal "tick" count for thread switching
// queue = Tibrv.defaultQueue()
// while 1:
//     queue.timedDispatch(0.1)
//     print "..."
//
// This *will not work*, i.e. the other running thread will not be able to
run
// properly! The reason for this is that when timedDispatch() returns,
while
// the main loop thread will release the GIL it is itself waiting to
reaqcuire
// it immediately. This means we now have 2 threads waiting to proceed: Our
// other thread and the main thread wanting to continue with the next loop
// iteration. The way thread switching is implemented (at least up to
Python 2.6)
// one or the other may be scheduled to run, depending on OS scheduler
decisions
// (see http://www.dabeaz.com/python/UnderstandingGIL.pdf for a thorough
// explanation). This can result in the other thread not ever getting the
chance
// to continue.
// It is still possible to support threaded programs with a construct like
//
// queue = Tibrv.defaultQueue()
// while 1:
//     queue.timedDispatch(0.1)
//     time.sleep(0.0001)
//
// Blocking I/O forces release of the GIL, as does time.sleep() (the Python
C
// macro Py_BEGIN_ALLOW_THREADS basically does that: "hey, I~m entering
some
// blocking operation, here~s the GIL back" [quoted from
//
http://jessenoller.com/2009/02/01/python-threads-and-the-global-interpreter-lock/,
// which is also an excellent introduction to threading and the GIL]). So
this
// really gives our 2nd thread the possibility to run.
// Note, however, that while the main thread waits in timedDispatch(X) for
X
// seconds
//  * no signals will be retrieved to allow program interruption
//  * the other thread(s) are not able to run
// which means X must be reasonably small. In consequence, such a solution
would
// be suboptimal at best.

#include <boost/python.hpp> // silence  warning: "_FILE_OFFSET_BITS"
redefined
#include "tibrvqueue.hpp"
#include <queue.h>
#include <boost/python/errors.hpp>
//#include <boost/python/objects.hpp>
//#include <boost/python/conversions.hpp>
#include <boost/format.hpp>
#include <iostream>
#include <string>
#include <pythread.h>
#include <pthread.h>
#include <math.h>
#include <signal.h>
#include <string.h>
#include <errno.h>
#include <sys/time.h>
//#undef DEBUG
//#define DEBUG


#ifdef DEBUG
// FIXME: This is provided on Solaris but not Linux; is there some
equivalent#
// we could use instead?
#ifdef SIG2STR_MAX
std::ostream& operator<<(std::ostream& to, const sigset_t *sigset)
{
    char buf[SIG2STR_MAX];
    for (int s=0;s<MAXSIG;s++) {
        sig2str(s, buf);
        to << buf << "(" << s << "):" << (sigismember(sigset, s)) <<
std::endl;
    }
    return to;
}

std::ostream &operator<<(std::ostream &to,const struct sigaction& sa)
{
  to << "sa_flags=" << sa.sa_flags << "(";
  to << ( (sa.sa_flags & SA_SIGINFO ) ? "SA_SIGINFO " : "" );
  to << ( (sa.sa_flags & SA_RESTART ) ? "SA_RESTART " : "" );
  to << ( (sa.sa_flags & SA_ONSTACK ) ? "SA_ONSTACK " : "" );
  to << ( (sa.sa_flags & SA_NODEFER ) ? "SA_NODEFER " : "" );
  to << ( (sa.sa_flags & SA_RESETHAND ) ? "SA_RESETHAND " : "" );
  to << ") sa_handler=" << (void*)sa.sa_handler << " sa_sigaction=" <<
(void*)sa.sa_sigaction;
  return to;
}

void dumpsigs()
{
    sigset_t sigset;
    pthread_sigmask(0, 0, &sigset);
    std::cout << "thread: " << pthread_self() << " sigset: " << &sigset <<
std::endl;

    struct sigaction sa;
    sigaction(SIGINT, 0, &sa);
    std::cout << "SIGINT:" << sa << std::endl;
    sigaction(SIGTERM, 0, &sa);
    std::cout << "SIGTERM:" << sa << std::endl;
    sigaction(SIGQUIT, 0, &sa);
    std::cout << "SIGQUIT:" << sa << std::endl;
}
#endif // ifdef SIG2STR_MAX
#endif // ifdef DEBUG

ThreadedTibrvQueue::instance_list ThreadedTibrvQueue::m_instances;
pthread_mutex_t ThreadedTibrvQueue::m_instances_mutex;
int ThreadedTibrvQueue::m_waiting = 0;
ThreadedTibrvQueue::sighandler_t ThreadedTibrvQueue::m_sighandler[NSIG];


ThreadedTibrvQueue::ThreadedTibrvQueue() : TibrvQueue()
{
    // this section is guarded by the global interpreter lock
    if (m_instances.size() == 0) {
        pthread_mutex_init(&m_instances_mutex, 0);
    }
    // prevent signal handlers from iterating m_instances at this point in
time
    pthread_mutex_lock( &m_instances_mutex );
    m_instances.push_back(this);
    pthread_mutex_unlock( &m_instances_mutex );

    TibrvStatus status = create();
    if (status != TIBRV_OK) {
        throw std::runtime_error((boost::format("TibrvQueue::create: %1%")
% status.getText()).str());
    }
    pthread_mutex_init(&m_mutex, 0);
    pthread_cond_init(&m_cond, 0);
    status = tibrvQueue_SetHook(getHandle(), &_myHook, this);
    if (status != TIBRV_OK) {
        throw std::runtime_error((boost::format("tibrvQueue_SetHook: %1%")
% status.getText()).str());
    }
}


TibrvStatus ThreadedTibrvQueue::destroy()
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;
    return destroy(NULL, NULL);
}


// must not repeat closure's default arg value here:
// in tibrvqueue.hpp:  TibrvStatus destroy(TibrvQueueOnComplete*
completeCB, const void* closure=NULL);
TibrvStatus ThreadedTibrvQueue::destroy(TibrvQueueOnComplete* completeCB,
const void* closure)
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;

    if (getHandle() != TIBRV_INVALID_ID) {
        TibrvStatus status = tibrvQueue_RemoveHook(getHandle());
        pthread_cond_destroy(&m_cond);
        pthread_mutex_destroy(&m_mutex);
        if (status != TIBRV_OK) {
            throw std::runtime_error((boost::format("tibrvQueue_RemoveHook:
%1%") % status.getText()).str());
        }
        pthread_mutex_lock( &m_instances_mutex );
        m_instances.remove(this);
        pthread_mutex_unlock( &m_instances_mutex );
    }
    DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl;
    return TibrvQueue::destroy(completeCB, closure);
}


void ThreadedTibrvQueue::waitEvent()
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;

    tibrv_u32 available = 0;
    TibrvStatus status;
    int cond_status = 0;
    assert(m_waiting>=0);
    if (!m_waiting) installSignalHandlers();
    m_waiting++;

    Py_BEGIN_ALLOW_THREADS;
    DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_mutex_lock" <<
std::endl;

    pthread_mutex_lock( &m_mutex );

    status = getCount(available);
//     while (!available && cond_status == 0 && status == TIBRV_OK) {
    if (!available && status == TIBRV_OK) {
        cond_status = pthread_cond_wait(&m_cond, &m_mutex);
        status = getCount(available);
        DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << "available = " <<
available << std::endl;
    }
//     assert(available);
    DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_mutex_unlock" <<
std::endl;

    pthread_mutex_unlock( &m_mutex );
    Py_END_ALLOW_THREADS;

    m_waiting--;
    assert(m_waiting>=0);
    if (!m_waiting) removeSignalHandlers();

    if (status != TIBRV_OK) {
        throw std::runtime_error((boost::format("TibrvQueue::getCount:
%1%") % status.getText()).str());
    }
    if (cond_status != 0) {
        //throw std::runtime_error(std::string("pthread_cond_wait
failed"));
        // to raise a standard python exception, either create a custom
exception
        // and register an exception translator, or do it manually like
this:
        // FIXME: Should go to a helper lib that takes the message and the
Exception
        const char message[] = "pthread_cond_wait failed";
        PyErr_SetString(PyExc_OSError, message);
        boost::python::throw_error_already_set();
    }
#if 0
    if (!available) {
        boost::python::tuple exc_value(boost::python::ref
(BOOST_PYTHON_CONVERSION::to_python(EINTR)),
                                       boost::python::ref
(BOOST_PYTHON_CONVERSION::to_python(strerror(EINTR))));
        //throw boost::python::OSError(exc_value.reference().get());
        std::string message = (boost::format("%1%") % exc_value.reference
().get()).str();
        PyErr_SetString(PyExc_OSError, message.c_str());
        boost::python::throw_error_already_set();
    }
#endif

    DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": " << available <<
std::endl;
}

static void set_interval(struct timespec* timeout_spec, double timeout)
{
    double integral, fractional;
    time_t sec; long nsec;
    fractional = modf(timeout, &integral);
    sec = (time_t)integral;
    nsec = (long)(fractional*1.0e9);

#ifdef sun
    clock_gettime(CLOCK_REALTIME, timeout_spec);
#else
    struct timeval tv;
    gettimeofday(&tv, 0);
    timeout_spec->tv_sec = tv.tv_sec;
    timeout_spec->tv_nsec = tv.tv_usec*1000;
#endif
    timeout_spec->tv_sec += (sec + (timeout_spec->tv_nsec + nsec) /
1000000000);
    timeout_spec->tv_nsec = (timeout_spec->tv_nsec + nsec) % 1000000000;
}


bool ThreadedTibrvQueue::waitEvent(double timeout)
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;

    tibrv_u32 available = 0;
    TibrvStatus status;
    int cond_status = 0;

    assert(m_waiting>=0);
    if (!m_waiting) installSignalHandlers();
    m_waiting++;

    Py_BEGIN_ALLOW_THREADS;
    pthread_mutex_lock( &m_mutex );
    struct timespec timeout_spec;

    set_interval(&timeout_spec, timeout);
    status = getCount(available);

//     while (!available && cond_status != ETIMEDOUT && status == TIBRV_OK)
{
    if (!available && status == TIBRV_OK) {
        cond_status = pthread_cond_timedwait(&m_cond, &m_mutex,
&timeout_spec);
        status = getCount(available);
    }

//     assert(available || cond_status == ETIMEDOUT || status != TIBRV_OK);
    pthread_mutex_unlock( &m_mutex );
    Py_END_ALLOW_THREADS;
    m_waiting--;
    assert(m_waiting>=0);
    if (!m_waiting) removeSignalHandlers();

    if (status != TIBRV_OK) {
        throw std::runtime_error((boost::format("TibrvQueue::getCount:
%1%") % status.getText()).str());
    }
    if (cond_status != 0 && cond_status != ETIMEDOUT) {
        // FIXME: Should go to a helper lib
        const char message[] = "pthread_cond_timedwait failed";
        PyErr_SetString(PyExc_OSError, message);
        boost::python::throw_error_already_set();
    }

    DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << ": " << available
<< std::endl;

    return available != 0;
}

void ThreadedTibrvQueue::_myHook( tibrvQueue eventQueue, void* closure)
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;

    ThreadedTibrvQueue* queue = (ThreadedTibrvQueue*)closure;
    assert(queue->getHandle() == eventQueue);
    pthread_mutex_lock( &(queue->m_mutex) );
    pthread_cond_signal( &(queue->m_cond) );
    pthread_mutex_unlock( &(queue->m_mutex) );

    DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl;
}

void ThreadedTibrvQueue::sighandler(int signum)
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;
    DEBUGTRACER_TRACE << "signal " << signum << std::endl;

    pthread_mutex_lock( &m_instances_mutex );
    for (instance_list::iterator iter = m_instances.begin();
iter !=m_instances.end(); iter++) {
        ThreadedTibrvQueue* queue = *iter;
        pthread_mutex_lock( &(queue->m_mutex) );
        DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_cond_signal"
<< std::endl;
        pthread_cond_signal( &(queue->m_cond) );

        DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ":
pthread_mutex_unlock" << std::endl;
        pthread_mutex_unlock( &(queue->m_mutex) );
    }
    pthread_mutex_unlock( &m_instances_mutex );
    sighandler_t oldhandler = m_sighandler[signum];

#ifdef DEBUG
#ifdef SIG2STR_MAX
    char buf[SIG2STR_MAX];
    sig2str(signum, buf);
    DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << "oldhandler for " << buf <<
"=" << (void*)(oldhandler) << std::endl;
#endif // ifdef SIG2STR_MAX
#endif // ifdef DEBUG
    if (oldhandler != SIG_DFL && oldhandler != SIG_IGN && oldhandler !=
SIG_ERR
#ifdef SIG_HOLD
        && oldhandler != SIG_HOLD
#endif
        ) {
        (*oldhandler)(signum);
    }
    DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl;

}


void ThreadedTibrvQueue::installSignalHandlers()
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;

    m_sighandler[SIGINT] = signal(SIGINT, sighandler);
    m_sighandler[SIGTERM] = signal(SIGTERM, sighandler);
    m_sighandler[SIGQUIT] = signal(SIGQUIT, sighandler);

    DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl;
}

void ThreadedTibrvQueue::removeSignalHandlers()
{
    DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl;

    signal(SIGINT, m_sighandler[SIGINT]);
    signal(SIGTERM, m_sighandler[SIGTERM]);
    signal(SIGQUIT, m_sighandler[SIGQUIT]);

   DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl;
}


Landesbank Baden-Wuerttemberg
Anstalt des oeffentlichen Rechts
Hauptsitze: Stuttgart, Karlsruhe, Mannheim, Mainz
HRA 12704
Amtsgericht Stuttgart



More information about the Cplusplus-sig mailing list