New to Programming: Adding custom functions with ipynotify classes
Saran A
ahlusar.ahluwalia at gmail.com
Thu Apr 2 21:30:42 EDT 2015
Hello All:
Here is the program that I am trying to write (with specs):
* Monitors a folder for files that are dropped throughout the day
* When a file is dropped in the folder the program should scan the file
o IF all the records in the file have the same length (line length)
o THEN the file should be moved to a "success" folder and a text file written indicating the total number of records processed
o IF the file is empty OR the records are not all of the same length
o THEN the file should be moved to a "failure" folder and a text file written indicating the cause for failure (for example: Empty file or line 100 was not the same length as the rest).
Many on forums suggest using ipynotify. I am wondering how to combine my current script and add it to the ipynotify.
Below is my original script (the ipynotify script is provided after this)
[code]
# # # Without data to examine here, I can only guess based on this requirement's language that
# # fixed records are in the input.
##I made the assumption that the directories are in the same filesystem
# # Takes the function fileinfo as a starting point and demonstrates calling a function from within a function.
# I tested this little sample on a small set of files created with MD5 checksums. I wrote the Python in such a way as it
# would work with Python 2.x or 3.x (note the __future__ at the top).
# # # There are so many wonderful ways of failure, so, from a development standpoint, I would probably spend a bit
# # more time trying to determine which failure(s) I would want to report to the user, and how (perhaps creating my own Exceptions)
# # # The only other comments I would make are about safe-file handling.
# # # #1: Question: After a user has created a file that has failed (in
# # # processing),can the user create a file with the same name?
# # # If so, then you will probably want to look at some sort
# # # of file-naming strategy to avoid overwriting evidence of
# # # earlier failures.
# # # File naming is a tricky thing. I referenced the tempfile module [1] and the Maildir naming scheme to see two different
# # types of solutions to the problem of choosing a unique filename.
## I am assuming that all of my files are going to be specified in unicode
## Utilized Spyder's Scientific Computing IDE to debug, check for indentation errors and test function suite
from __future__ import print_function
import os.path
import time
import difflib
import logging
def initialize_logger(output_dir):
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# create console handler and set level to info
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# create error file handler and set level to error
handler = logging.FileHandler(os.path.join(output_dir, "error.log"),"w", encoding=None, delay="true")
handler.setLevel(logging.ERROR)
formatter = logging.Formatter("%(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# create debug file handler and set level to debug
handler = logging.FileHandler(os.path.join(output_dir, "all.log"),"w")
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
#This function's purpose is to obtain the filename, rootdir and filesize
def fileinfo(f):
filename = os.path.basename(f)
rootdir = os.path.dirname(f)
filesize = os.path.getsize(f)
return filename, rootdir, filesize
#This helper function returns the length of the file
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
#This helper function attempts to copy file and move file to the respective directory
#I am assuming that the directories are in the same filesystem
# If directories ARE in different file systems, I would use the following helper function:
# def move(src, dest):
# shutil.move(src, dest)
def copy_and_move_file(src, dest):
try:
os.rename(src, dest)
# eg. src and dest are the same file
except IOError as e:
print('Error: %s' % e.strerror)
path = "."
dirlist = os.listdir(path)
# Caveats of the "main" function is that it does not scale well
#(although it is appropriate if one assumes that there will be few changes)
# It does not account for updated files existing in the directory - only new files "dropped" in
# (If this was included in the requirements, os.stat would be appropriate here)
def main(dirlist):
before = dict([(f, 0) for f in dirlist])
while True:
time.sleep(1) #time between update check
after = dict([(f, None) for f in dirlist])
added = [f for f in after if not f in before]
if added:
f = ''.join(added)
print('Sucessfully added %s file - ready to validate') %(f)
return validate_files(f)
else:
return move_to_failure_folder_and_return_error_file(f)
def validate_files(f):
creation = time.ctime(os.path.getctime(f))
lastmod = time.ctime(os.path.getmtime(f))
if creation == lastmod and file_len(f) > 0:
return move_to_success_folder_and_read(f)
if file_len < 0 and creation != lastmod:
return move_to_success_folder_and_read(f)
else:
return move_to_failure_folder_and_return_error_file(f)
# Failure/Success Folder Functions
def move_to_failure_folder_and_return_error_file():
filename, rootdir, lastmod, creation, filesize = fileinfo(file)
os.mkdir('Failure')
copy_and_move_file( 'Failure')
initialize_logger('rootdir/Failure')
logging.error("Either this file is empty or there are no lines")
def move_to_success_folder_and_read():
filename, rootdir, lastmod, creation, filesize = fileinfo(file)
os.mkdir('Success')
copy_and_move_file(rootdir, 'Success') #file name
print("Success", file)
return file_len(file)
if __name__ == '__main__':
main(dirlist)
[/code]
Here is my ipynotify script that I have tried writing, following the tutorial:
[code]
# My version for w: monitors events and logs them into a log file.
#
import os.path
from pyinotify import pyinotify
timestamp = datetime.today() #time_record
mask = pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO #watched events
class EventHandler(pyinotify.ProcessEvent):
def process_IN_CREATE(self, event):
print "Created: %s " % os.path.join(event.path, event.name)
event_log = open('/Users/sahluwalia/Desktop/', 'a')
event_log.write(event.name + ' - ' + timestamp.strftime('%c') + '\n')
event_log.close()
def process_IN_MOVED_TO(self, event):
print "Moved: %s " % os.path.join(event.path, event.name)
event_log = open('/Users/sahluwalia/Desktop/', 'a')
event_log.write(event.name + ' - ' + timestamp.strftime('%c') + '\n')
event_log.close()
handler = EventHandler() #instantiated EventHandler Class
notifier = pyinotify.Notifier(wm, handler)
class Watcher(pyinotify.ProcessEvent): #I haave modified the Watcher class to process and read a new file creation or added file
watchdir = '/tmp/watch'
def __init__(self):
pyinotify.ProcessEvent.__init__(self)
wm = pyinotify.WatchManager()
self.notifier = pyinotify.ThreadedNotifier(wm, self)
wdd = wm.add_watch(self.watchdir, pyinotify.EventsCodes.IN_CREATE)
print "Watching", self.watchdir
self.notifier.start()
def process_IN_CREATE(self, event):
print "Seen:", event
pathname = os.path.join(event.path, event.name)
pfile = self._parse(pathname)
print(pfile)
def process_IN_MOVED_TO(self, event):
print "Moved: %s " % os.path.join(event.path, event.name)
pathname = os.path.join(event.path, event.name)
pfile = self._parse(pathname)
print(pfile)
def _parse(self, filename):
f = open(filename)
file = [line.strip() for line in f]
f.close()
return file
class Log(pyinotify.ProcessEvent):
def my_init(self, fileobj):
"""
Method automatically called from ProcessEvent.__init__(). Additional
keyworded arguments passed to ProcessEvent.__init__() are then
delegated to my_init(). This is the case for fileobj.
"""
self._fileobj = fileobj
def process_default(self, event):
self._fileobj.write(str(event) + '\n')
self._fileobj.flush()
class TrackModifications(pyinotify.ProcessEvent):
def process_IN_MODIFY(self, event):
print 'IN_MODIFY'
class Empty(pyinotify.ProcessEvent): #Inherited class to display message
def my_init(self, msg):
self._msg = msg
def process_default(self, event): #writes decribing the event
print self._msg
# pyinotify.log.setLevel(10)
filelog = file('/Failure', 'w')
while True:
try:
notifier.process_events()
if notifier.check_events():
notifier.read_events()
try:
# It is important to pass named extra arguments like 'fileobj'
handler = Empty(TrackModifications(Log(fileobj=filelog)), msg='This is an error message or notificaiton that will be logged ')
notifier = pyinotify.Notifier(wm, default_proc_fun=handler)
wm.add_watch('/tmp', pyinotify.ALL_EVENTS)
notifier.loop()
filelog.close()
except KeyboardInterrupt:
notifier.stop()
break
finally:
filelog.close()
if __name__ == '__main__':
Watcher()
[/code]
More information about the Python-list
mailing list