Python extension module with callbacks into Python

Barry Scott barry at barrys-emacs.org
Wed Oct 28 08:23:22 EDT 2020



> On 27 Oct 2020, at 19:21, Paul Grinberg <gri6507 at gmail.com> wrote:
> 
> As full disclosure, I posted this question on StackOverflow as well, but it looks like questions with [Python] [Extension-Module] tags are not frequently answered. The link to my question there is https://stackoverflow.com/questions/64559322/python-extension-module-with-callbacks-into-python
> 
> I am running into unpredictable behavior with my Python extension module that wraps around a C++ library that starts a new pthread and, after doing some work, generates callbacks back into the caller. I've greatly simplified this to a simplistic example which still demonstrates this problem. The following will sometimes generate a Fatal Python error: PyEval_SaveThread: NULL tstate, usually rather quickly. Sometimes it SIGSEGV on tupledealoc. Occasionally this deadlocks. I am at a loss why. Does anyone have any ideas?


You did not say what OS and python version you are working with.

Why do you need to call PyEval_ThreadsInitialized()? its deprecated.

PyInitialize sets up threads unconditionally since 3.7.

When I  have been in a similar situation I have used the debugger to
find out what the CPYTHON code is expecting and worked backwards from there.

Reading the python sources helps a lot.

Barry



> 
> The relevant code for this is below, between the ==== sections ====
> 
> === python program ===
> 
> import mymod
> from time import sleep
> from random import randrange
> 
> def my_cb1(s):
>    print("Python cb %s" % (s));
> 
> for x in range(1,1000):
>    num_cb = randrange(5) + 1
>    print("Starting %d" % mymod.doit(my_cb1, "myid" + str(x), num_cb))
> 
> while True:
>    sleep(1)
> 
> === Extension Module ===
> 
> #include <pthread.h>
> 
> #define PY_SSIZE_T_CLEAN
> #include <Python.h>
> #include <stddef.h>
> 
> #include <iostream>
> #include <map>
> #include <deque>
> #include <mutex>
> #include <functional>
> #include <thread>
> 
> static std::map<std::string, PyObject *> cb_map;
> static std::mutex map_mtx;
> 
> struct fake_cb_info
> {
>  fake_cb_info() = delete;
>  fake_cb_info(const unsigned long &num_cb, const std::string &id) :
>    num_cb(num_cb), id(id)
>  {
>  }
>  const unsigned long num_cb;
>  const std::string id;
> };
> static std::deque<struct fake_cb_info> deq;
> static std::mutex deq_mtx;
> 
> static bool is_worker_thread_running = false;
> static std::thread worker_thread;
> 
> typedef std::function<void(const std::string &id, const std::string &s)> doit_cb_t;
> static void internal_cb(const std::string &id, const std::string &s)
> {
>  std::scoped_lock<std::mutex> lk(map_mtx);
> 
>  if (0 != cb_map.count(id))
>  {
>      PyGILState_STATE gstate;
>      gstate = PyGILState_Ensure();
> 
>      PyObject *arglist = Py_BuildValue("(s)", s.c_str());
>      PyObject *result = PyObject_CallObject(cb_map.at(id), arglist);
>      Py_DECREF(arglist);
> 
>      if (NULL == result)
>      {
>          if (NULL == PyErr_Occurred())
>          {
>            std::cerr << "Unknown error occurred in C callback" << std::endl;
>          }
>          else
>          {
>              PyErr_Print();
>          }
>      }
>      else
>      {
>        Py_DECREF(result);
>      }
> 
>      PyGILState_Release(gstate);
>  }
>  else
>  {
>    std::cerr << "Unknown callback id " << id << std::endl;
>  }
> }
> 
> void static worker()
> {
>  size_t x = 0;
>  while(true)
>  {
>    std::scoped_lock<std::mutex> lk(deq_mtx);
>    if (deq.size() == 0)
>    {
>      usleep(1000);
>      continue;
>    }
> 
>    auto info = deq.front();
>    deq.pop_front();
>    for (unsigned long i=0; i<info.num_cb; i++)
>    {
>      internal_cb(info.id, std::to_string(x++));
>    }
>  }
> }
> 
> PyObject * _wrap_doit(void *self, PyObject *args, PyObject *kwargs)
> {
>    PyObject *py_retval;
>    PyThreadState *py_thread_state = NULL;
>    PyObject *cb;
>    const char *id = NULL;
>    Py_ssize_t id_len;
>    std::string id_std;
>    unsigned long num_callbacks;
>    const char *keywords[] = {"cb_func", "id", "num_cb", NULL};
> 
>    if (!PyArg_ParseTupleAndKeywords(args, kwargs, (char *) "Os#k", (char **) keywords, &cb, &id, &id_len, &num_callbacks))
>    {
>        abort();
>    }
>    if (!PyCallable_Check(cb))
>    {
>        abort();
>    }
> 
>    id_std = std::string(id, id_len);
> 
>    {
>      std::scoped_lock<std::mutex> lk(map_mtx);
>      if (0 == cb_map.count(id_std))
>      {
>        Py_INCREF(cb);
>        cb_map.insert(std::make_pair(id_std, cb));
> 
>        // N.B. The corresponding Py_DECREF for the callback function PyObject
>        // is intentionally not here. It is in another extension module method
>        // that is not listed here (just trying to keep this example as small
>        // and lean as possible)
>      }
>      else
>      {
>        std::cerr << "Only one callback for ID!" << std::endl;
>        abort();
>      }
>    }
> 
>    if (PyEval_ThreadsInitialized ())
>    {
>      std::cout << "Saving thread" << std::endl;
>      py_thread_state = PyEval_SaveThread();
>    }
> 
>    {
>      // Stash away the info so that we will know how many callbacks to
>      // generate and sleep a bit. This is to simulate a real external library
>      // doing work which will, in turn, generate callbacks
>      struct fake_cb_info info(num_callbacks, id_std);
>      std::scoped_lock<std::mutex> lk(deq_mtx);
>      deq.push_back(info);
> 
>      if (!is_worker_thread_running)
>      {
>        std::cout << "@@@@ Creating a new thread\n";
>        worker_thread = std::thread(&worker);
>        pthread_setname_np(worker_thread.native_handle(), "worker_thread");
>        worker_thread.detach();
>        is_worker_thread_running = true;
>      }
> 
>      usleep(10000);
>    }
> 
>    if (py_thread_state)
>    {
>      std::cout << "Restoring thread" << std::endl;
>      PyEval_RestoreThread(py_thread_state);
>    }
> 
>    py_retval = Py_BuildValue((char *) "k", num_callbacks);
>    return py_retval;
> }
> 
> static PyMethodDef mymod_functions[] = {
>    {
>      (char *) "doit",
>      (PyCFunction) _wrap_doit,
>      METH_KEYWORDS | METH_VARARGS,
>      "Generate requested number of multi-threaded callbacks.\n doit(callback_fn, id, num_callbacks)"
>    },
>    {NULL, NULL, 0, NULL}
> };
> 
> static struct PyModuleDef moduledef = {
>    PyModuleDef_HEAD_INIT,
>    "mymod",
>    "pthread test module",
>    -1,
>    mymod_functions,
> };
> 
> #define MOD_ERROR NULL
> #define MOD_INIT(name) PyObject* PyInit_##name(void)
> #define MOD_RETURN(val) val
> 
> #if defined(__cplusplus)
> extern "C"
> #endif
> 
> #if defined(__GNUC__) && __GNUC__ >= 4
> __attribute__ ((visibility("default")))
> #endif
> 
> 
> MOD_INIT(mymod)
> {
>    PyObject *m = PyModule_Create(&moduledef);
>    if (m == NULL) {
>        return MOD_ERROR;
>    }
>    return MOD_RETURN(m);
> }
> -- 
> https://mail.python.org/mailman/listinfo/python-list
> 



More information about the Python-list mailing list