[Numpy-discussion] Pickle, pytables, and sqlite - loading and saving recarray's

Gael Varoquaux gael.varoquaux at normalesup.org
Fri Jul 20 10:56:28 EDT 2007


On Fri, Jul 20, 2007 at 08:35:51AM -0500, Vincent Nijs wrote:
> Sounds very interesting! Would you mind sharing an example (with code if
> possible) of how you organize your experimental data in pytables. I have
> been thinking about how I might organize my data in pytables and would luv
> to hear how an experienced user does that.

I can show you the processing code. The experiment I have close to me is
run by Matlab, the one that is fully controlled by Python is a continent
away.

Actually, I am really lazy, so I am just going to copy brutally the IO
module.

Something that can be interesting is that the data is saved by the
expirement control framework on a computer (called Krubcontrol), this
data can then be retrieve using the "fetch_files" Python command, that
puts it on the server and logs it into a data base like hash table. When
we want to retrieve the data we have a special object krubdata, which
uses some fancy indexing to retrieve by data, or specifying the keywords.

I am sorry I am not providing the code that is writing the hdf5 files, it
is an incredible useless mess, trust me. I would be able to factor out
the output code out of the 5K matlab lines. Hopefuly you'll be able to
get an idea of the structure of the hdf5 files by looking at the code
that does the loading. I haven't worked with this data for a while, so I
can't tell you

Some of the Python code might be useful to others, especially the hashing
and retrieving part. The reason why I didn't use a relational DB is that
I simply don't trust them enough for my precious data.

Gaël
-------------- next part --------------
"""
Krub.load

Routines to load the data saved by the experiment and build useful
structures out of it.

Author: Gael Varoquaux <gael.varoquaux at normalesup.org>
Copyright: Laboratoire Charles Fabry de l'Institut d'Optique
License: BSD-like

"""
# Avoid division problems
from __future__ import division

# To load hdf5
import tables
# Do not display any warnings (FIXME: this is too strict)
tables.warnings.filterwarnings('ignore')

# regular expressions
import re

import os, sys, shutil
import datetime

# Module for object persistence
import shelve

# provide globbing
from glob import glob

from numpy import array

# FIXME: This will pose problem when pytables transit to numpy.
from numarray.strings import CharArray

# FIXME: This is to much hardcoded
data_root = "/home/manip/data"
db_file_name = "/home/manip/analysis/krubDB.db"

def load_h5(file_name):
    """ Loads an hdf5 file and returns a dict with the hdf5 data in it.
    """
    file = tables.openFile(file_name)
    out_dict = {}
    for key, value in file.leaves.iteritems():
        if isinstance(value, tables.UnImplemented):
            continue
        try:
            value = value.read()
            try:
                if isinstance(value, CharArray):
                    value = value.tolist()
            except Exception, inst:
                print "Couldn't convert %s to a list" % key
                print inst
            if len(value) == 1:
                value = value[0]
            out_dict[key[1:]] = value
        except Exception, inst:
            print "couldn't load %s" % key
            print inst
    file.close()
    return(out_dict)

def load_Krub(file_name):
    """ Loads a file created by cameraview and returns a dict with the
        data restructured in a more pleasant way. 
    """
    data = load_h5(file_name)
    # Store the params in a dict
    try:
        params = {}
        for name, value in zip(data['SCparamsnames'],
                                                     data['SCparams']):
            params[name] = value
        data.update(params)
        data['params'] = params
        data.pop('SCparams')
        data.pop('SCparamsnames')
    except  Exception, inst:
        print "couldn't convert params to a dict: "
        print inst
    return data

def load_seq(file_list):
    """ Loads a sequence of hdf5 files created by cameraview and returns
        a list of dicts with the data.
    """
    return [ load_Krub(file_name) for file_name in file_list ]

def build_param_table(file_list):
    """ Scans the given list of files and returns a dictionary of 
        dictionaries discribing the files, and the experimental parameters.
    """
    out_dict = {}
    for file_name in file_list:
        data = load_Krub(file_name)
        if 'params' in data:
            params = data['params']
        else:
            params = {}
        params['filename'] = file_name
        if 'sequencename' in data: params['sequencename'] = data['sequencename']
        if 'fitfunction' in data: params['fitfunction'] = data['fitfunction']
        if 'loopposition' in data: params['loopposition'] = data['loopposition']
        if 'roi' in data: params['roi'] = data['roi']
        # Check that the filename has the timestamp
        if re.match(r".*\d\d_\d\d_\d\d", file_name[:-3]):
            params['time'] = int( file_name[-11:-9] + 
                                    file_name[-8:-6] +
                                    file_name[-5:-3] )
        # Check whether the directory of the file has the datestamp.
        full_path = os.path.abspath(file_name)
        params['fullpath'] = full_path
        dir_path = full_path.replace(data_root+os.sep,'')
        dir_name = dir_path.split(os.sep)[0]
        if re.match(r"\d\d\d\d\d\d", dir_name):
            params['date'] = int(dir_name)
        out_dict[full_path] = params
        # Delete manually the data, let us not trust the garbage collector
        # here: we cannot afford wasting memory
        del data
        print >>sys.stderr, ".",
    return out_dict

def add_files(file_list):
    """ Adds the given files to the Krub database.
    """
    # An ugly hack to change the file permissions even if we do not own
    # the file: start a new file, and replace the old one with the new
    # one.
    hash_table =  build_param_table(file_list)
    dbase_new = shelve.open(db_file_name + "new")
    dbase_old = shelve.open(db_file_name)
    dbase_new.update(dbase_old)
    dbase_new.update(hash_table)
    dbase_old.close()
    dbase_new.close()
    os.chmod(db_file_name + "new", 0777)
    shutil.move(db_file_name, db_file_name + "old")
    shutil.move(db_file_name + "new", db_file_name)

def rebuild_db():
    """ Rescans the complete data directories to rebuild the database.
    """
    database = {}
    for dirpath, dirnames, filenames in os.walk(data_root):
        print "\nscanning ", dirpath
        h5files = [dirpath + os.sep + filename for filename in filenames 
                                                    if filename[-3:]==".h5"]
        database.update(build_param_table(h5files))
    os.rename(db_file_name, db_file_name+"back")
    dbase = shelve.open(db_file_name)
    dbase.update(database)
    dbase.close()
    os.chmod(db_file_name, 0777)

def query_db(**kwargs):
    """ Queries the database to find files matching certain parameters.
        Returns the database entries (dictionnaries) of these files.

        >>> query_db(molasse_time=8., seq_name='FORT_2b', mot_load_time_s= 6.)
    """
    dbase = shelve.open(db_file_name)
    out_dict = {}
    for file_name, params in dbase.iteritems():
        store = True
        for param, value in kwargs.iteritems():
            if param in params:
                if not params[param] == value:
                    store = False
                    break
        if store:
            out_dict[file_name] = params
    dbase.close()
    return out_dict

def select_seq(seq, **kwargs):
    """ Selects filenames in the given list according to the specified
        parameters. The files must be in the database.

        >>> select_seq(krubdata[:], seq_name='FORT_2b')
    """
    # FIXME: This is way to much copied and pasted from query_db
    dbase = shelve.open(db_file_name)
    out_list = []
    for file_name in seq:
        params = dbase[file_name]
        store = True
        for param, value in kwargs.iteritems():
            if param in params:
                if not params[param] == value:
                    store = False
                    break
            else:
                store = False
                break
        if store:
            out_list += [file_name, ]
    dbase.close()
    return out_list

def extract_param(seq, param_name):
    """ Return an array with all the values the given parameter takes in
        the sequence of file names given.
    """
    dbase = shelve.open(db_file_name)
    out_list = []
    for file_name in seq:
        params = dbase[file_name]
        if param_name in params:
            out_list += [ params[param_name], ]
    # Use a set to have unique entries:
    out_list = array(list(set(out_list)))
    out_list.sort()
    return out_list

###########################################################################
# Hack to use the gnome-vfs to update the files from Krubcontrol
###########################################################################

import gnomevfs

FLAGS =  gnomevfs.PERM_USER_ALL + gnomevfs.PERM_GROUP_ALL + \
         gnomevfs.PERM_OTHER_ALL

def fetch_files():
    """ updates the data from krubcontrol """
    if not gnomevfs.exists('smb://krubcontrol/data'):
        raise IOError, "Cannot connect to Krubcontrol"
    file_list = _walk_gnomevfs('smb://krubcontrol/data/Manip/data')
    if len(file_list) == 0:
        print "Nothing new"
    else:
        print "Adding files to database"
        add_files([file_name for file_name in file_list 
                        if file_name[-3:]=='.h5' ])

def _walk_gnomevfs(uri, base='smb://krubcontrol/data/Manip/data'):
    """ Private function used to scan remote windows drives """
    file_list = []
    dir_iterator = gnomevfs.open_directory(uri)
    for entry in dir_iterator:
        if entry.name[0] == '.':
            continue
        entry_uri = uri + "/" + entry.name
        local_uri = entry_uri.replace(base,"file://" + data_root)
        disk_uri = local_uri.replace("file://", "")
        if entry.type == gnomevfs.FILE_TYPE_DIRECTORY:
            if not gnomevfs.exists(local_uri):
                gnomevfs.make_directory(local_uri, FLAGS)
                os.chmod(disk_uri, 0777)
            file_list += _walk_gnomevfs(entry_uri)
        else:
            if not gnomevfs.exists(local_uri):
                file_list += [disk_uri, ]
                print "uploading :", entry_uri
                inuri = gnomevfs.URI(entry_uri)
                outuri = gnomevfs.URI(local_uri)
                gnomevfs.xfer_uri(inuri, outuri,
                                  gnomevfs.XFER_DEFAULT,
                                  gnomevfs.XFER_ERROR_MODE_ABORT,
                                  gnomevfs.XFER_OVERWRITE_MODE_SKIP)
                os.chmod(disk_uri, 0777)
    return file_list

class KrubData(object):
    """ An indexed object to access the data stored in the database.
        
        This object returns a list of file names pointing to data
        matching given criteria. It can be called with one or to indexing
        parameters: the first parameter is the hour indexes of the data,
        in the form "hhmmss", as an integer, with no leading zeros. The
        second indexing parameter is the data. If it is omitted it
        defaults to the current day.

        >>> krubdata[150833]
        ['/home/manip/data/061016/FORT_2b_15_08_33.h5']
        >>> krubdata[150833,61016]
        ['/home/manip/data/061016/FORT_2b_15_08_33.h5']
        
        Time indexes support slices:
        
        >>> krubdata[150700:150800,61016]
        ['/home/manipdata/061016/FORT_2b_15_07_04.h5',
         '/home/manip/data/061016/FORT_2b_15_07_19.h5',
         '/home/manip/data/061016/FORT_2b_15_07_34.h5',
         '/home/manip/data/061016/FORT_2b_15_07_48.h5']
        >>> krubdata[150700:150800:2,61016]         # Skip 1 out of 2
        ['/home/manip/data/061016/FORT_2b_15_07_04.h5',
         '/home/manip/data/061016/FORT_2b_15_07_34.h5']

        Both times and date can be called with negative integers. The
        indexes then refer to the nth last day, or shot:

        >>> krubdata[194900:,-1]        # Data taken yesterday, after 19:49
        ['/home/manip/data/061018/FORT_2b_19_49_05.h5']
        >>> krubdata[-2:,]              # Last 2 shots
        ['/home/manip/data/061018/FORT_2b_19_48_53.h5',
         '/home/manip/data/061018/FORT_2b_19_49_05.h5']
        
        *see also:* query_db, build_param_table, and the doc for Krub.io
        WARNING : do not write 0 in front of the date : for 06.12.13 write 
        61213 and not 061213
    """

    def __getitem__(self, *args):
        """ Use the indexing to retrive the data. First set of index is the 
            time in hhmmss. Leading zeros should be suppressed.
        """
        # Only one index given, date is today:
        today = datetime.date.today()
        # I don't now why the args are passed in a tuple, if there is a
        # date argument. Lets get rid of this
        if isinstance(args[0], tuple) :
            args = args[0]
        # Parse the date argument.
        if len(args)==1:
            print "No date index given, defaulting to today."
            date = int(today.strftime('%y%m%d'))
        elif args[1]<0:
            date = today - datetime.timedelta(days=-args[1])
            date = int(date.strftime('%y%m%d'))
        else:
            date = args[1]
        # Parse the time argument
        time_segment = None
        time_start = None
        time_stop = None
        time = None
        if not isinstance(args[0], slice):
            # If this is not a slice, it must be an int
            if args[0]<0:
                # Counting from the back. Make it a one-spaced slice, to
                # reuse our back-counting code.
                relative_time_start = args[0]
                relative_time_stop = args[0]+1
                time_step = None
                time_segment = True
            else:
                time=args[0]
        if isinstance(args[0], slice):
            relative_time_start = None
            relative_time_stop = None
            time_step = None
            time_segment = args[0]
            if time_segment.start and time_segment.start<0:
                relative_time_start = time_segment.start
            elif time_segment.start:
                time_start = time_segment.start-1
            else:
                time_start = time_segment.start
            if time_segment.stop and time_segment.stop<0:
                relative_time_stop = time_segment.stop
            elif time_segment.stop:
                time_stop = time_segment.stop+1
            else:
                time_stop = time_segment.stop
            if time_segment.step:
                time_step = time_segment.step
        # Open the database
        dbase = shelve.open(db_file_name)
        out_list = []
        for file_name, params in dbase.iteritems():
            if not ('date' in params and params['date'] == date) :
                continue
            if not 'time' in params :
                continue
            if time and not params['time'] == time :
                continue
            if time_start and not params['time'] > time_start :
                continue
            if time_stop and not params['time'] < time_stop :
                continue
            out_list += [file_name, ]
        # Now deal with the relative times, and the step
        if time_segment:
            # We need to sort the list by time.
            get_time = lambda x: dbase[x]['time']
            out_list.sort(key=get_time)
            out_list = out_list[
                relative_time_start:relative_time_stop:time_step]
        dbase.close()
        return out_list

krubdata = KrubData()


More information about the NumPy-Discussion mailing list