Example Code : Shared Memory with Mutex (pywin32 and ctypes)

Srijit Kumar Bhadra srijit at yahoo.com
Sat Apr 2 11:13:09 EST 2005


Hello,
Here is some sample code with pywin32 build 203 and ctypes 0.9.6.

Best regards,
/Srijit

File: SharedMemCreate_Mutex_win32all.py

# This application should be used with SharedMemAccess_Mutex_ctypes.py
or SharedMemAccess_Mutex_win32all.py
#~ a) Creates a shared memory
#~ b) Creates or Opens a mutex
#~ c) Reads the contents (null terminated string) of shared memory
#~ d) Acquires a mutex and then writes a null terminated string to the
shared memory
#~ e) Sleeps upto 2 seconds. Sleep time is generated by a random number
#~ f) Repeats steps (c), (d) and (e) indefintely

import mmap, random, time
import win32event, pywintypes, win32api

def main():
    SHMEMSIZE = 256

    ERROR_ALREADY_EXISTS = 183

    szName = "MyFileMappingObject"
    szMsg1 = "Message from first process using win32all and mmap - "
    szMutex = "MyMutexObject"

    shmem = mmap.mmap(0, SHMEMSIZE, szName, mmap.ACCESS_WRITE)

    try:
        hMutex = win32event.CreateMutex(None, pywintypes.FALSE,
szMutex)
        if (win32api.GetLastError() == ERROR_ALREADY_EXISTS):
                print"Opened existing mutex object", szMutex
        else:
            print "Created new mutex"

        i=0
        random.seed()
        while 1:
            szMsg1 = szMsg1 + hex(i) + "\0"
            if (len(szMsg1) > SHMEMSIZE):
                print "Current size of string message is", len(szMsg1),
"and greater than", SHMEMSIZE
                break
            shmem_read = shmem.read(SHMEMSIZE)
            shmem_read = shmem_read.rstrip(chr(0))
            shmem_read = shmem_read.rstrip(" ")
            print "RECEIVED from SECOND Process: ", shmem_read
            shmem.seek(0)
            wait_result = win32event.WaitForSingleObject(hMutex, 1000)
            if (wait_result == win32event.WAIT_OBJECT_0):
                shmem.write(szMsg1)
                while (shmem.tell() != SHMEMSIZE):
                    shmem.write_byte(" ")
                shmem.seek(0)
                print "WROTE in FIRST process: ", szMsg1
                win32event.ReleaseMutex(hMutex)
            elif(wait_result == win32event.WAIT_TIMEOUT):
                 print "COULD NOT ACQUIRE MUTEX. TIMEOUT OCCURED"
            elif (wait_result == win32event.WAIT_ABONDONED):
                 print "WAIT ABANDONED"
            i = i + 1
            szMsg1 = "Message from first process using win32all and
mmap - "
            time.sleep(random.random()*2)

    except pywintypes.error, (errno, object, strerror):
            print "Error in", object, ":", strerror

if __name__ == "__main__":
    main()


File: SharedMemCreate_Mutex_ctypes.py

# This application should be used with SharedMemAccess_Mutex_ctypes.py
or SharedMemAccess_Mutex_win32all.py
#~ a) Creates a shared memory
#~ b) Creates or Opens a mutex
#~ c) Reads the contents (null terminated string) of shared memory
#~ d) Acquires a mutex and then writes a null terminated string to the
shared memory
#~ e) Sleeps upto 2 seconds. Sleep time is generated by a random number
#~ f) Repeats steps (c), (d) and (e) indefintely


# There are two options to implement this code - Option A or Option B.
If Option B(A) is chosen then
# Option A(B) should be commented.
import random, time
from ctypes import *

def main():
    try:
        SHMEMSIZE = 256

        TRUE = 1
        FALSE = 0
        ERROR_ALREADY_EXISTS = 183
        FILE_MAP_ALL_ACCESS = 0xF001F
        WAIT_OBJECT_0 = 0
        WAIT_TIMEOUT = 0x102
        WAIT_ABANDONED = 0x80
        PAGE_READWRITE = 0x04
        INVALID_HANDLE_VALUE = 0xFFFFFFFF

        szName = "MyFileMappingObject"
        szMsg1 = "Message from first process using ctypes - "
        szMutex = "MyMutexObject"

        hMutex = windll.kernel32.CreateMutexA(None, FALSE, szMutex)
        if (hMutex == 0):
            raise WinError()
        elif (windll.kernel32.GetLastError() == ERROR_ALREADY_EXISTS):
            print"Opened existing mutex object", szMutex
        else:
            print "Created new mutex"

        hMap = windll.kernel32.CreateFileMappingA(INVALID_HANDLE_VALUE,
None, PAGE_READWRITE, 0, SHMEMSIZE, szName)
        if (hMap == 0):
            print "Could not open file mapping object"
            raise WinError()

        MapViewOfFile = windll.kernel32.MapViewOfFile
        MapViewOfFile.restype = POINTER(c_char) # Option A
        pBuf = MapViewOfFile(hMap, FILE_MAP_ALL_ACCESS, 0, 0, 0)
        if (pBuf == 0):
            raise WinError()

        i=0
        random.seed()
        while 1:
            szMsg1 = szMsg1 + hex(i) + "\0"
            pBuf_str = cast(pBuf, c_char_p) # Option A
            if (len(szMsg1) > SHMEMSIZE):
                print "Current size of string message is", len(szMsg1),
"and greater than", SHMEMSIZE
                break
            print "RECEIVED from SECOND Process: ", pBuf_str.value #
Option A
            #~ print "RECEIVED from SECOND Process: ", string_at(pBuf)
# Option B
            wait_result = windll.kernel32.WaitForSingleObject(hMutex,
1000)
            if (wait_result == WAIT_OBJECT_0):
                memset(pBuf, ord(" "), SHMEMSIZE)
                cdll.msvcrt.strcpy(pBuf_str, szMsg1) # Option A
                #~ cdll.msvcrt.strcpy(pBuf, szMsg1) # Option B
                print "WROTE in FIRST process: ", szMsg1
                release_mutex = windll.kernel32.ReleaseMutex(hMutex)
                if (release_mutex == 0):
                    print "CANNOT RELEASE ACQUIRED MUTEX"
            elif(wait_result == WAIT_TIMEOUT):
                 print "COULD NOT ACQUIRE MUTEX. TIMEOUT OCCURED"
            elif (wait_result == WAIT_ABONDONED):
                 print "WAIT ABANDONED"
            i = i + 1
            szMsg1 = "Message from first process using ctypes - "
            time.sleep(random.random()*2)

        windll.kernel32.UnmapViewOfFile(pBuf)
        windll.kernel32.CloseHandle(hMap)
        return
    except WindowsError, (strerror):
        print strerror
        if (pBuf != 0):
            windll.kernel32.UnmapViewOfFile(pBuf)
        if (hMap != 0):
            windll.kernel32.CloseHandle(hMap)
        return

if __name__ == "__main__":
    main()


File : SharedMemAccess_Mutex_win32all.py

# This application should be used with SharedMemCreate_Mutex_ctypes.py
or SharedMemCreate_Mutex_win32all.py
#~ a) Opens an existing shared memory
#~ b) Opens an existing mutex
#~ c) Reads the contents (null terminated string) of shared memory
#~ d) Acquires a mutex and then writes a null terminated string to the
shared memory
#~ e) Sleeps upto 2 seconds. Sleep time is generated by a random number
#~ f) Repeats steps (c), (d) and (e) indefintely

import mmap, random, time
import win32event, pywintypes

def main():
    SHMEMSIZE = 256

    STANDARD_RIGHTS_REQUIRED = 0xF0000
    SYNCHRONIZE = 0x100000
    MUTANT_QUERY_STATE = 0x1

    MUTEX_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE |
MUTANT_QUERY_STATE

    szName = "MyFileMappingObject"
    szMsg1 = "Message from second process using win32all and mmap - "
    szMutex = "MyMutexObject"

    shmem = mmap.mmap(0, SHMEMSIZE, szName, mmap.ACCESS_WRITE)
    try:
        hMutex = win32event.OpenMutex(MUTEX_ALL_ACCESS,
pywintypes.FALSE, szMutex)
        print"Opened existing mutex object", szMutex
        i = 0;
        random.seed()
        while 1:
            szMsg1 = szMsg1 + hex(i) + "\0"
            if (len(szMsg1) > SHMEMSIZE):
                print "Current size of string message is", len(szMsg1),
"and greater than", SHMEMSIZE
                break
            shmem_read = shmem.read(SHMEMSIZE)
            shmem_read = shmem_read.rstrip(chr(0))
            shmem_read = shmem_read.rstrip(" ")
            print "RECEIVED from FIRST Process: ", shmem_read
            shmem.seek(0)
            wait_result = win32event.WaitForSingleObject(hMutex, 1000)
            if (wait_result == win32event.WAIT_OBJECT_0):
                shmem.write(szMsg1)
                while (shmem.tell() != SHMEMSIZE):
                    shmem.write_byte(" ")
                shmem.seek(0)
                print "WROTE in SECOND process: ", szMsg1
                win32event.ReleaseMutex(hMutex)
            elif(wait_result == win32event.WAIT_TIMEOUT):
                 print "COULD NOT ACQUIRE MUTEX. TIMEOUT OCCURED"
            elif (wait_result == win32event.WAIT_ABONDONED):
                 print "WAIT ABANDONED"
            i = i + 1
            szMsg1 = "Message from second process using win32all and
mmap - "
            time.sleep(random.random()*2)
    except pywintypes.error, (errno, object, strerror):
        print "Error in", object, ":", strerror

    shmem.close()
    return


if __name__ == "__main__":
    main()


File: SharedMemAccess_Mutex_ctypes.py

# This application should be used with SharedMemCreate_Mutex_ctypes.py
or SharedMemCreate_Mutex_win32all.py
#~ a) Opens an existing shared memory
#~ b) Opens an existing mutex
#~ c) Reads the contents (null terminated string) of shared memory
#~ d) Acquires a mutex and then writes a null terminated string to the
shared memory
#~ e) Sleeps upto 2 seconds. Sleep time is generated by a random number
#~ f) Repeats steps (c), (d) and (e) indefintely


# There are two options to implement this code - Option A or Option B.
If Option B(A) is chosen then
# Option A(B) should be commented.
import random, time
from ctypes import *

def main():

    SHMEMSIZE = 256

    FILE_MAP_ALL_ACCESS = 0xF001F
    STANDARD_RIGHTS_REQUIRED = 0xF0000
    SYNCHRONIZE = 0x100000
    MUTANT_QUERY_STATE = 0x1
    MUTEX_ALL_ACCESS = STANDARD_RIGHTS_REQUIRED | SYNCHRONIZE |
MUTANT_QUERY_STATE
    WAIT_OBJECT_0 = 0
    WAIT_TIMEOUT = 0x102
    WAIT_ABANDONED = 0x80
    TRUE = 1
    FALSE = 0

    szName = "MyFileMappingObject"
    szMsg1 = "Message from second process using ctypes - "
    szMutex = "MyMutexObject"

    hMap = 0
    pBuf = 0
    hMutex = 0

    try:
        hMap = windll.kernel32.OpenFileMappingA(FILE_MAP_ALL_ACCESS,
FALSE, szName)
        if (hMap == 0):
            raise WinError()

        MapViewOfFile = windll.kernel32.MapViewOfFile
        MapViewOfFile.restype = POINTER(c_char) # Option A
        pBuf = MapViewOfFile(hMap, FILE_MAP_ALL_ACCESS, 0, 0, 0)
        if (pBuf == 0):
            raise WinError()

        hMutex = windll.kernel32.OpenMutexA(MUTEX_ALL_ACCESS, FALSE,
szMutex)
        if (hMutex == 0):
            raise WinError()
        else:
            print"Opened existing mutex object", szMutex

        i=0
        random.seed()
        while 1:
            szMsg1 = szMsg1 + hex(i) + "\0"
            pBuf_str = cast(pBuf, c_char_p) # Option A
            if (len(szMsg1) > SHMEMSIZE):
                print "Current size of string message is", len(szMsg1),
"and greater than", SHMEMSIZE
                break
            print "RECEIVED from FIRST Process: ", pBuf_str.value #
Option A
            #~ print "RECEIVED from FIRST Process: ", string_at(pBuf) #
Option B
            wait_result = windll.kernel32.WaitForSingleObject(hMutex,
1000)
            if (wait_result == WAIT_OBJECT_0):
                memset(pBuf, ord(" "), SHMEMSIZE)
                cdll.msvcrt.strcpy(pBuf_str, szMsg1) # Option A
                #~ cdll.msvcrt.strcpy(pBuf, szMsg1) # Option B
                print "WROTE in SECOND process: ", szMsg1
                release_mutex = windll.kernel32.ReleaseMutex(hMutex)
                if (release_mutex == 0):
                    print "CANNOT RELEASE ACQUIRED MUTEX"
            elif(wait_result == WAIT_TIMEOUT):
                 print "COULD NOT ACQUIRE MUTEX. TIMEOUT OCCURED"
            elif (wait_result == WAIT_ABONDONED):
                 print "WAIT ABANDONED"
            i = i + 1
            szMsg1 = "Message from second process using ctypes - "
            time.sleep(random.random()*2)

        windll.kernel32.UnmapViewOfFile(pBuf)
        windll.kernel32.CloseHandle(hMap)
        return
    except WindowsError, (strerror):
        print strerror
        if (pBuf != 0):
            windll.kernel32.UnmapViewOfFile(pBuf)
        if (hMap != 0):
            windll.kernel32.CloseHandle(hMap)
        return
        
if __name__ == "__main__":
    main()




More information about the Python-list mailing list