parallel (concurrent) eventlet

David Gabriel davidgab283 at gmail.com
Mon Jan 18 10:03:23 EST 2016


Dears,

Let me add one more detail: When I add these two lines to check whether my
modules are monkey_patched or not I get *False* as a result.
I think it is strange to get this result since I patched my modules at the
beginning using: eventlet.monkey_patch() as detailed here
<http://eventlet.net/doc/patching.html#import-green>.

print "*****is_monkey_patched(ldap.syncrepl) : %s*****" %
eventlet.patcher.is_monkey_patched('ldap.syncrepl')
print "*****is_monkey_patched(ldap.ldapobject) : %s*****" %
eventlet.patcher.is_monkey_patched('ldap.ldapobject')


Please advise me how to fix this issue.
Kind regards.


2016-01-18 12:03 GMT+01:00 David Gabriel <davidgab283 at gmail.com>:

> Dears,
>
> I have an issue when I use eventlet Api to create parallel threads.
> In fact, when I run the below code, only the program dealing with the
> synchronozation with ldap data base is working and is continuously blocking
> the others to run.
> But, when I use the 'thread' Api the program is working fine without any
> blocking issue. However, I can not use thread Api and I must to use
> eventlet.
> So I am wondering how to get the thread Api behavior using the eventlet
> Api ?
>
> Could you please inform me how to fix this issue ?
>
> Kindly find below my code.
> But you need some configurations regarding ldap server/client.
>
>
> #!/usr/bin/python
> # -*- coding: utf-8 -*-
> """
> This script implements a syncrepl consumer which syncs data from an
> OpenLDAP
> server to a local (shelve) database.
> Notes:
> The bound user needs read access to the attributes entryDN and entryCSN.
> This needs the following software:
> Python
> pyasn1 0.1.4+
> pyasn1-modules
> python-ldap 2.4.10+
> """
>
> # Import the python-ldap modules
> import ldap,ldapurl
> # Import specific classes from python-ldap
> from ldap.ldapobject import ReconnectLDAPObject
> from ldap.syncrepl import SyncreplConsumer
>
> # Import modules from Python standard lib
> import shelve,signal,time,sys,logging
> import eventlet
> #import thread
> eventlet.monkey_patch()
>
> # Global state
> watcher_running = True
> ldap_connection = False
>
>
>
> class SyncReplConsumer(ReconnectLDAPObject,SyncreplConsumer):
>     """
>     Syncrepl Consumer interface
>     """
>     def __init__(self,db_path,*args,**kwargs):
>         # Initialise the LDAP Connection first
>         ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)
>         # Now prepare the data store
>         self.__data = shelve.open(db_path, 'c')
>         # We need this for later internal use
>         self.__presentUUIDs = dict()
>
>     def __del__(self):
>             # Close the data store properly to avoid corruption
>             self.__data.close()
>
>     def syncrepl_get_cookie(self):
>         if 'cookie' in self.__data:
>             return self.__data['cookie']
>
>     def syncrepl_set_cookie(self,cookie):
>         self.__data['cookie'] = cookie
>
>     def syncrepl_entry(self,dn,attributes,uuid):
>
>         # First we determine the type of change we have here (and store
> away the previous data for later if needed)
>         previous_attributes = dict()
>         if uuid in self.__data:
>             change_type = 'modify'
>             previous_attributes = self.__data[uuid]
>         else:
>             change_type = 'add'
>         # Now we store our knowledge of the existence of this entry
> (including the DN as an attribute for convenience)
>         attributes['dn'] = dn
>         self.__data[uuid] = attributes
>         # Debugging
>         print 'Detected', change_type, 'of entry:', dn
>         # If we have a cookie then this is not our first time being run,
> so it must be a change
>         if 'ldap_cookie' in self.__data:
>                 self.perform_application_sync(dn, attributes,
> previous_attributes)
>
>     def syncrepl_delete(self,uuids):
>         # Make sure we know about the UUID being deleted, just in case...
>         uuids = [uuid for uuid in uuids if uuid in self.__data]
>         # Delete all the UUID values we know of
>         for uuid in uuids:
>             print 'Detected deletion of entry:', self.__data[uuid]['dn']
>             del self.__data[uuid]
>
>     def syncrepl_present(self,uuids,refreshDeletes=False):
>         # If we have not been given any UUID values, then we have recieved
> all the present controls...
>         if uuids is None:
>             # We only do things if refreshDeletes is false as the syncrepl
> extension will call syncrepl_delete instead when it detects a delete notice
>             if refreshDeletes is False:
>                 deletedEntries = [uuid for uuid in self.__data.keys() if
> uuid not in self.__presentUUIDs and uuid != 'ldap_cookie']
>                 self.syncrepl_delete( deletedEntries )
>             # Phase is now completed, reset the list
>             self.__presentUUIDs = {}
>         else:
>             # Note down all the UUIDs we have been sent
>             for uuid in uuids:
>                     self.__presentUUIDs[uuid] = True
>
>     def perform_application_sync(self,dn,attributes,previous_attributes):
>         print 'Performing application sync for:', dn
>         return True
>
>
> # Shutdown handler
> #def commenceShutdown(signum, stack):
> def commenceShutdown():
>     # Declare the needed global variables
>     global watcher_running, ldap_connection
>     print 'Shutting down!'
>
>     # We are no longer running
>     watcher_running = False
>
>     # Tear down the server connection
>     if( ldap_connection ):
>             del ldap_connection
>
>     # Shutdown
>     sys.exit(0)
>
> def mainOfSyncrepl(threadName):
>      # Time to actually begin execution
>      # Install our signal handlers
> #     signal.signal(signal.SIGTERM,commenceShutdown)
> #     signal.signal(signal.SIGINT,commenceShutdown)
>      try:
>        ldap_url =
> ldapurl.LDAPUrl('ldap://localhost/dc=example,dc=org?*?sub?(objectClass=*)?bindname=cn=admin%2cdc=test%2cdc=com,X-BINDPW=myPassword')#ldapurl.LDAPUrl(sys.argv[1])
>      #  ldap_url = ldapurl.LDAPUrl(link)
>        database_path = 'test.com'#sys.argv[2]
>      #  database_path = pathName
>      except IndexError,e:
>        print 'Usage: syncrepl-client.py <LDAP URL> <pathname of database>'
>        sys.exit(1)
>      except ValueError,e:
>        print 'Error parsing command-line arguments:',str(e)
>        sys.exit(1)
>
>      while watcher_running:
>          print 'Connecting to LDAP server now...'
>          # Prepare the LDAP server connection (triggers the connection as
> well)
>          ldap_connection =
> SyncReplConsumer(database_path,ldap_url.initializeUrl())
>
>          # Now we login to the LDAP server
>          try:
>              ldap_connection.simple_bind_s(ldap_url.who,ldap_url.cred)
>          except ldap.INVALID_CREDENTIALS, e:
>              print 'Login to LDAP server failed: ', str(e)
>              sys.exit(1)
>          except ldap.SERVER_DOWN:
>              continue
>
>          # Commence the syncing
>          print 'Commencing sync process'
>          ldap_search = ldap_connection.syncrepl_search(
>            ldap_url.dn or '',
>            ldap_url.scope or ldap.SCOPE_SUBTREE,
>            mode = 'refreshAndPersist',
>            filterstr = ldap_url.filterstr or '(objectClass=*)')
>          print 'After syncrepl_search.'
>          try:
>              while ldap_connection.syncrepl_poll( all = 1, msgid =
> ldap_search):
>                   pass
>          except KeyboardInterrupt:
>           # User asked to exit
>              commenceShutdown()
>              pass
>          except Exception, e:
>           # Handle any exception
>              if watcher_running:
>                  print 'Encountered a problem, going to retry. Error:',
> str(e)
>                  eventlet.sleep(5)
>              pass
>
> # Define a function for the 2nd thread
> def print_time(ThreadName):
>      count = 0
>      delay = 3
>      while 1:#count < 5:
>          count += 1
>          print "%s: %s" % (ThreadName, time.ctime(time.time()) )
>          eventlet.sleep(delay)
>
>
>
> print 'Before call threads'
>
> evt1 = eventlet.spawn(mainOfSyncrepl, "Thread-1",)
> evt2 = eventlet.spawn(print_time, "Thread-2",)
> evt3 = eventlet.spawn(print_time, "Thread-3",)
>
> print 'After call threads'
>
> evt1.wait()
> evt2.wait()
> evt3.wait()
>
> print 'After wait'
>
>
>
> 2016-01-12 7:20 GMT-08:00 Ned Batchelder <ned at nedbatchelder.com>:
>
>> David,
>>
>> We aren't going to be able to debug code that we can't see.  Please post
>> a link to the *actual* code that you are running.
>>
>> --Ned.
>>
>>
>> On 1/12/16 7:00 AM, David Gabriel wrote:
>>
>> Dears
>>
>> For more details, I am using this code
>> <https://github.com/rbarrois/python-ldap/blob/master/Demo/pyasn1/syncrepl.py>in
>> order to ensure the updates from my data base.
>> However, when I create an eventlet basing on this code, my program is
>> blocked there and is not running other eventlets !!!
>> Please advise me how to fix this issue ?
>>
>> Thanks in advance.
>> Regards
>>
>> 2016-01-11 7:29 GMT-08:00 David Gabriel <davidgab283 at gmail.com>:
>>
>>> Thanks so much John
>>> In fact your proposal works fine for this simple example but when I use
>>> it for a complex code (a data base client that receives all updates from
>>> the db), my program is continously running this db client and not other
>>> programs.
>>>
>>> Any suggestions
>>> Thanks in advance
>>> regards
>>>
>>> 2016-01-11 5:50 GMT-08:00 John Eskew < <john.eskew at gmail.com>
>>> john.eskew at gmail.com>:
>>>
>>>> Add this line below your imports:
>>>>
>>>> eventlet.monkey_patch()
>>>>
>>>> Here's why that line should fix things:
>>>>
>>>> http://eventlet.net/doc/patching.html#greening-the-world
>>>>
>>>> On Mon, Jan 11, 2016 at 6:27 AM, David Gabriel <
>>>> <davidgab283 at gmail.com>davidgab283 at gmail.com> wrote:
>>>>
>>>>> Dears,
>>>>> It is the first time I am developping with python and I want to
>>>>> execute parallel threads using eventlet.When I run the below code, only one
>>>>> thread is executed and not both.Please could you tell me how to fix this
>>>>> issue ?
>>>>> Please advise me how to ensure a concurrent behavior between evt1 and
>>>>> evt2.
>>>>>
>>>>> import eventlet
>>>>> import time
>>>>>
>>>>> def print_time(ThreadName):
>>>>>         count = 0
>>>>>         delay = 3
>>>>>         while 1:#count < 5:
>>>>>             count += 1
>>>>>             print "%s: %s" % (ThreadName, time.ctime(time.time()) )
>>>>>             time.sleep(delay)
>>>>>
>>>>> print 'Before call threads'
>>>>>
>>>>> evt1 = eventlet.spawn(print_time, "Thread-1",)
>>>>> evt2 = eventlet.spawn(print_time, "Thread-2",)
>>>>>
>>>>> print 'After call threads'
>>>>>
>>>>> evt1.wait()
>>>>> evt2.wait()
>>>>>
>>>>>
>>>>>
>>>>> Any answer is welcome.
>>>>> Thanks in advance.
>>>>> Regards.
>>>>>
>>>>> _______________________________________________
>>>>> Boston mailing list
>>>>> Boston at python.org
>>>>> https://mail.python.org/mailman/listinfo/boston
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> _______________________________________________
>> Boston mailing listBoston at python.orghttps://mail.python.org/mailman/listinfo/boston
>>
>>
>>
>> _______________________________________________
>> Boston mailing list
>> Boston at python.org
>> https://mail.python.org/mailman/listinfo/boston
>>
>>
>
>



More information about the Python-list mailing list