question from beginner

dario keygrand at libero.it
Fri Sep 9 10:17:10 EDT 2005


Dennis, my english is not too good...

About first question this is the implementation of SER class (this is
freeware from the source www.telit.it):

#Telit Extensions
#
#Copyright © 2004, DAI Telecom S.p.A.
#All rights reserved.
#
#Redistribution and use in source and binary forms, with or without
#modification, are permitted provided that the following conditions
#are met:
#
#Redistributions of source code must retain the above copyright notice,

#this list of conditions and the following disclaimer.
#
#Redistributions in binary form must reproduce the above copyright
#notice, this list of conditions and the following disclaimer in
#the documentation and/or other materials provided with the
distribution.
#
#
#THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
``AS
#IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
#TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
#PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
#CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
#EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
#PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
#PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
#LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
#NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
#SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import time
import serial

freeser = serial.Serial('COM4', 9600, timeout=0, rtscts=1)

def send(string):
  global freeser
  freeser.write(string)
  result = 1
  return result

def receive(timeout):
  global freeser
  sectimeout = int((timeout+5)/10)
  time.sleep(sectimeout)
  string = freeser.read(512)
  return string

def read():
  global freeser
  string = freeser.read(512)
  return string

def sendbyte(byte):
  global freeser
  string = chr(byte)
  freeser.write(string)
  result = 1
  return result

def receivebyte(timeout):
  global freeser
  sectimeout = int((timeout+5)/10)
  time.sleep(sectimeout)
  string = freeser.read(1)
  if string == '':
    result = -1
  else:
    result = ord(string)
  return result

def readbyte():
  global freeser
  string = freeser.read(1)
  if string == '':
    result = -1
  else:
    result = ord(string)
  return result

def SetSpeed(speed):
  global freeser
  if speed == '9600':
    freeser.setBaudrate(9600)
    result = 1
  elif speed == '19200':
    freeser.setBaudrate(19200)
    result = 1
  elif speed == '115200':
    freeser.setBaudrate(115200)
    result = 1
  else:
    result = -1
  return result

This is the serial class

class Serial(serialutil.FileLike):
    def __init__(self,
                 port,                  #number of device, numbering
starts at
                                        #zero. if everything fails, the
user
                                        #can specify a device string,
note
                                        #that this isn't portable
anymore
                 baudrate=9600,         #baudrate
                 bytesize=EIGHTBITS,    #number of databits
                 parity=PARITY_NONE,    #enable parity checking
                 stopbits=STOPBITS_ONE, #number of stopbits
                 timeout=None,          #set a timeout value, None to
wait forever
                 xonxoff=0,             #enable software flow control
                 rtscts=0,              #enable RTS/CTS flow control
                 ):
        """initialize comm port"""

        self.timeout = timeout

        if type(port) == type(''):       #strings are taken directly
            self.portstr = port
        else:
            self.portstr = device(port)

        try:
            self.hComPort = win32file.CreateFile(self.portstr,
                   win32con.GENERIC_READ | win32con.GENERIC_WRITE,
                   0, # exclusive access
                   None, # no security
                   win32con.OPEN_EXISTING,
                   win32con.FILE_ATTRIBUTE_NORMAL |
win32con.FILE_FLAG_OVERLAPPED,
                   None)
        except Exception, msg:
            self.hComPort = None    #'cause __del__ is called anyway
            raise serialutil.SerialException, "could not open port: %s"
% msg
        # Setup a 4k buffer
        win32file.SetupComm(self.hComPort, 4096, 4096)

        #Save original timeout values:
        self.orgTimeouts = win32file.GetCommTimeouts(self.hComPort)

        #Set Windows timeout values
        #timeouts is a tuple with the following items:
        #(ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
        # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
        # WriteTotalTimeoutConstant)
        if timeout is None:
            timeouts = (0, 0, 0, 0, 0)
        elif timeout == 0:
            timeouts = (win32con.MAXDWORD, 0, 0, 0, 0)
        else:
            #timeouts = (0, 0, 0, 0, 0) #timeouts are done with
WaitForSingleObject
            timeouts = (0, 0, int(timeout*1000), 0, 0)
        win32file.SetCommTimeouts(self.hComPort, timeouts)

        #win32file.SetCommMask(self.hComPort, win32file.EV_RXCHAR |
win32file.EV_TXEMPTY |
        #    win32file.EV_RXFLAG | win32file.EV_ERR)
        #~ win32file.SetCommMask(self.hComPort,
                #~ win32file.EV_RXCHAR | win32file.EV_RXFLAG |
win32file.EV_ERR)
        win32file.SetCommMask(self.hComPort, win32file.EV_ERR)

        # Setup the connection info.
        # Get state and modify it:
        comDCB = win32file.GetCommState(self.hComPort)
        comDCB.BaudRate = baudrate

        if bytesize == FIVEBITS:
            comDCB.ByteSize     = 5
        elif bytesize == SIXBITS:
            comDCB.ByteSize     = 6
        elif bytesize == SEVENBITS:
            comDCB.ByteSize     = 7
        elif bytesize == EIGHTBITS:
            comDCB.ByteSize     = 8

        if parity == PARITY_NONE:
            comDCB.Parity       = win32file.NOPARITY
            comDCB.fParity      = 0 # Dis/Enable Parity Check
        elif parity == PARITY_EVEN:
            comDCB.Parity       = win32file.EVENPARITY
            comDCB.fParity      = 1 # Dis/Enable Parity Check
        elif parity == PARITY_ODD:
            comDCB.Parity       = win32file.ODDPARITY
            comDCB.fParity      = 1 # Dis/Enable Parity Check

        if stopbits == STOPBITS_ONE:
            comDCB.StopBits     = win32file.ONESTOPBIT
        elif stopbits == STOPBITS_TWO:
            comDCB.StopBits     = win32file.TWOSTOPBITS
        comDCB.fBinary          = 1 # Enable Binary Transmission
        # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is
set to TRUE)
        if rtscts:
            comDCB.fRtsControl  = win32file.RTS_CONTROL_HANDSHAKE
            comDCB.fDtrControl  = win32file.DTR_CONTROL_HANDSHAKE
        else:
            comDCB.fRtsControl  = win32file.RTS_CONTROL_ENABLE
            comDCB.fDtrControl  = win32file.DTR_CONTROL_ENABLE
        comDCB.fOutxCtsFlow     = rtscts
        comDCB.fOutxDsrFlow     = rtscts
        comDCB.fOutX            = xonxoff
        comDCB.fInX             = xonxoff
        comDCB.fNull            = 0
        comDCB.fErrorChar       = 0
        comDCB.fAbortOnError    = 0

        win32file.SetCommState(self.hComPort, comDCB)

        # Clear buffers:
        # Remove anything that was there
        win32file.PurgeComm(self.hComPort,
                            win32file.PURGE_TXCLEAR |
win32file.PURGE_TXABORT |
                            win32file.PURGE_RXCLEAR |
win32file.PURGE_RXABORT)

        #print win32file.ClearCommError(self.hComPort) #flags, comState
=

        self._overlappedRead = win32file.OVERLAPPED()
        self._overlappedRead.hEvent = win32event.CreateEvent(None, 1,
0, None)
        self._overlappedWrite = win32file.OVERLAPPED()
        self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0,
0, None)

    def __del__(self):
        self.close()

    def close(self):
        """close port"""
        if self.hComPort:
            #Restore original timeout values:
            win32file.SetCommTimeouts(self.hComPort, self.orgTimeouts)
            #Close COM-Port:
            win32file.CloseHandle(self.hComPort)
            self.hComPort = None

    def setBaudrate(self, baudrate):
        """change baudrate after port is open"""
        if not self.hComPort: raise portNotOpenError
        # Setup the connection info.
        # Get state and modify it:
        comDCB = win32file.GetCommState(self.hComPort)
        comDCB.BaudRate = baudrate
        win32file.SetCommState(self.hComPort, comDCB)

    def inWaiting(self):
        """returns the number of bytes waiting to be read"""
        flags, comstat = win32file.ClearCommError(self.hComPort)
        return comstat.cbInQue

    def read(self, size=1):
        """read num bytes from serial port"""
        if not self.hComPort: raise portNotOpenError
        if size > 0:
            win32event.ResetEvent(self._overlappedRead.hEvent)
            flags, comstat = win32file.ClearCommError(self.hComPort)
            if self.timeout == 0:
                n = min(comstat.cbInQue, size)
                if n > 0:
                    rc, buf = win32file.ReadFile(self.hComPort,
win32file.AllocateReadBuffer(n), self._overlappedRead)

win32event.WaitForSingleObject(self._overlappedRead.hEvent,
win32event.INFINITE)
                    read = str(buf)
                else:
                    read = ''
            else:
                rc, buf = win32file.ReadFile(self.hComPort,
win32file.AllocateReadBuffer(size), self._overlappedRead)
                n = win32file.GetOverlappedResult(self.hComPort,
self._overlappedRead, 1)
                read = str(buf[:n])
        else:
            read = ''
        return read

    def write(self, s):
        """write string to serial port"""
        if not self.hComPort: raise portNotOpenError
        #print repr(s),
        if s:
            err, n = win32file.WriteFile(self.hComPort, s,
self._overlappedWrite)
            if err: #will be ERROR_IO_PENDING:
                # Wait for the write to complete.

win32event.WaitForSingleObject(self._overlappedWrite.hEvent,
win32event.INFINITE)

    def flushInput(self):
        if not self.hComPort: raise portNotOpenError
        win32file.PurgeComm(self.hComPort, win32file.PURGE_RXCLEAR |
win32file.PURGE_RXABORT)

    def flushOutput(self):
        if not self.hComPort: raise portNotOpenError
        win32file.PurgeComm(self.hComPort, win32file.PURGE_TXCLEAR |
win32file.PURGE_TXABORT)

    def sendBreak(self):
        if not self.hComPort: raise portNotOpenError
        import time
        win32file.SetCommBreak(self.hComPort)
        #TODO: how to set the correct duration??
        time.sleep(0.020)
        win32file.ClearCommBreak(self.hComPort)

    def setRTS(self,level=1):
        """set terminal status line"""
        if not self.hComPort: raise portNotOpenError
        if level:
            win32file.EscapeCommFunction(self.hComPort,
win32file.SETRTS)
        else:
            win32file.EscapeCommFunction(self.hComPort,
win32file.CLRRTS)

    def setDTR(self,level=1):
        """set terminal status line"""
        if not self.hComPort: raise portNotOpenError
        if level:
            win32file.EscapeCommFunction(self.hComPort,
win32file.SETDTR)
        else:
            win32file.EscapeCommFunction(self.hComPort,
win32file.CLRDTR)

    def getCTS(self):
        """read terminal status line"""
        if not self.hComPort: raise portNotOpenError
        return MS_CTS_ON & win32file.GetCommModemStatus(self.hComPort)
!= 0

    def getDSR(self):
        """read terminal status line"""
        if not self.hComPort: raise portNotOpenError
        return MS_DSR_ON & win32file.GetCommModemStatus(self.hComPort)
!= 0

    def getRI(self):
        """read terminal status line"""
        if not self.hComPort: raise portNotOpenError
        return MS_RING_ON & win32file.GetCommModemStatus(self.hComPort)
!= 0

    def getCD(self):
        """read terminal status line"""
        if not self.hComPort: raise portNotOpenError
        return MS_RLSD_ON & win32file.GetCommModemStatus(self.hComPort)
!= 0

#Nur Testfunktion!!
if __name__ == '__main__':
    print __name__
    s = Serial(0)



About second question I have to read a string of data from the modem
serial port but I was using .readbyte to semplify the code. In effect
.readbyte doesn't return a string type. I repeat the test with .read
method but I discovered another strange behaviour. If the string is not
'sended' to a DTE (serial not connected ot other) the .read method read
the string(s) still not 'sent'. In other words it seems that the rx and
tx serial buffer are shared.

Thanks for your time.
Dario.




More information about the Python-list mailing list