Example of signaling and creating a python daemon

jepler at unpythonic.net jepler at unpythonic.net
Thu Sep 15 11:31:25 EDT 2005


Here's a program I use to control volume.  Run one way, it waits for a
Unix signal and adjusts the volume according to the signal received.
Run another way, the PID of the daemon process is determined and a
signal is sent according to a commandline argument.

#!/usr/bin/env python

import pyosd, ossaudiodev, sys, signal, time
import os, optparse, signal, atexit, errno
FONT = "-*-helvetica-*-r-normal--34-*-*-*-*-*-*-*"
CHANNEL = "pcm"
DEVICE = "/dev/mixer"

def constrain(value, low, high):
    if value < low: return low
    if value > high: return high
    return value

class VolumeControl:
    def __init__(self, channel=CHANNEL, font=FONT):
        p = self.p = pyosd.osd(font, colour="#4040ff", shadow=2)
        p.set_shadow_offset(2)
        p.set_vertical_offset(64)
        p.set_align(pyosd.ALIGN_CENTER)
        m = self.m = ossaudiodev.openmixer()
        self.i = ossaudiodev.control_names.index(channel)
        print "i", self.i, ossaudiodev.control_names,
        print m.get(self.i)
        self.level = m.get(self.i)[0] or 75
        self.muted = m.get(self.i)[0] == 0

    def volume_change(self, delta):
        self.level = constrain(self.level + delta, 0, 100)
        self.muted = False
        self.m.set(self.i, (self.level, self.level))
        self.show()

    def volume_up(self):
        if self.level < 10 or self.level > 85: self.volume_change(1)
        else: self.volume_change(5)
    def volume_down(self):
        if self.level <= 10 or self.level > 85: self.volume_change(-1)
        else: self.volume_change(-5)

    def mute(self):
        if self.muted:
            self.muted = False
            self.m.set(self.i, (self.level, self.level))
        else:
            self.muted = True
            self.m.set(self.i, (0,0))
        self.show()

    def show(self):
        s = "%d%%" % self.level
        if self.muted: s = "(%s)" % s
        self.p.display("Volume: %s" % s)
        self.p.show()

    def owner(self):
        return os.access(DEVICE, os.W_OK)

    def quit(self): raise SystemExit

    def run(self):
        atexit.register(unlink)
        f = open(pidfile, "w")
        f.write("%d" % os.getpid())
        f.close()

        signal.signal(signal.SIGUSR1, lambda *args: self.volume_up())
        signal.signal(signal.SIGUSR2, lambda *args: self.volume_down())
        signal.signal(signal.SIGHUP,  lambda *args: self.mute())
        signal.signal(signal.SIGINT,  lambda *args: self.quit())
        signal.signal(signal.SIGQUIT, lambda *args: self.quit())
        while self.owner(): time.sleep(1)

def findpid():
    f = open(pidfile)
    p = f.read()
    f.close()
    return int(p)
    
def main():
    from optparse import OptionParser
    parser = OptionParser()
    parser.add_option("-d", "--no-daemon", action="store_false", dest="daemon",
                        default=True, help="Do not run in background")
    parser.add_option("-e", "--eject", dest="eject", action="store_true",
                        default=False, help="Eject the cdrom")
    parser.add_option("-q", "--quit", dest="quit", action="store_true",
                        default=False, help="Cause the daemon to exit")
    opts, args = parser.parse_args()
    #print opts
    if opts.quit:
        if not os.path.exists(pidfile):
            raise SystemExit, "pidfile does not exist"
        os.kill(findpid(), signal.SIGQUIT)
        return
    if opts.eject:
        os.kill(findpid(), signal.SIGHUP)
        return
    if os.path.exists(pidfile):
        p = findpid()
        try:
            os.kill(p, 0)
        except os.error, detail:
            if detail.errno == errno.ESRCH:
                print "stale pidfile exists.  removing it."
                os.unlink(pidfile)
        else:
            raise SystemExit, "valid pidfile exists.  Exiting."
    print "Starting cddoor",
    if opts.daemon:
        print "as daemon"
        daemonize()
    else:
        print
    daemon()

def unlink():
    print "unlinking", pidfile
    os.unlink(pidfile)

def daemonize (stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
    '''This forks the current process into a daemon.
    The stdin, stdout, and stderr arguments are file names that
    will be opened and be used to replace the standard file descriptors
    in sys.stdin, sys.stdout, and sys.stderr.
    These arguments are optional and default to /dev/null.
    Note that stderr is opened unbuffered, so
    if it shares a file with stdout then interleaved output
    may not appear in the order that you expect.
    '''
    # Do first fork.
    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0) # Exit first parent.
    except OSError, e: 
        sys.stderr.write ("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)   
 )
        sys.exit(1)
        
    # Decouple from parent environment.
    os.chdir("/") 
    os.umask(0) 
    os.setsid() 
    
    # Do second fork.
    try: 
        pid = os.fork() 
        if pid > 0:
            sys.exit(0) # Exit second parent.
    except OSError, e: 
        sys.stderr.write ("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)   
 )
        sys.exit(1)
        
    # Now I am a daemon!
    
    # Redirect standard file descriptors.
    si = file(stdin, 'r')
    so = file(stdout, 'a+')
    se = file(stderr, 'a+', 0)
    os.dup2(si.fileno(), sys.stdin.fileno())
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())

pidfile = os.path.expanduser("~/.volumepid")

def main():
    from optparse import OptionParser
    parser = OptionParser()
    parser.add_option("-d", "--no-daemon", action="store_false", dest="daemon",
                        default=True, help="Do not run in background")
    parser.add_option("--volume-up", dest="signal", action="store_const",
                        const=signal.SIGUSR1, default=0,
                        help="Increase the volume")
    parser.add_option("--volume-down", dest="signal", action="store_const",
                        const=signal.SIGUSR2, default=0,
                        help="Decrease the volume")
    parser.add_option("--mute", dest="signal", action="store_const",
                        const=signal.SIGHUP, default=0,
                        help="Mute or unmute the device")
    parser.add_option("-q", "--quit", dest="signal", action="store_const",
                        const=signal.SIGQUIT, default=0,
                        help="Cause the daemon to exit")
    opts, args = parser.parse_args()
    print opts
    if opts.signal:
        os.kill(findpid(), opts.signal)
        return
    if os.path.exists(pidfile):
        p = findpid()
        try:
            os.kill(p, 0)
        except os.error, detail:
            if detail.errno == errno.ESRCH:
                print "stale pidfile exists.  removing it."
                os.unlink(pidfile)
        else:
            raise SystemExit, "valid pidfile exists.  Exiting."
    print "Starting volumed",
    if opts.daemon:
        print "as daemon"
        daemonize()
    else:
        print
    app = VolumeControl()
    app.run()

if __name__ == '__main__': main()
-------------- next part --------------
A non-text attachment was scrubbed...
Name: not available
Type: application/pgp-signature
Size: 196 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/python-list/attachments/20050915/9ef1a1ce/attachment.sig>


More information about the Python-list mailing list