[Numpy-discussion] scipy.fftpack.diff operation question

R Schumacher rays at blue-cove.com
Tue Apr 28 16:38:58 EDT 2015


We are looking to plot to time series accelerometer data as velocity 
and displacement.
To this end we tried scipy.fftpack.diff, but in looking at the test 
code  direct_diff() we get odd results, and, why is the doc using 
"sqrt(-1)*j" in its explanation?
So, I tried  a few different integration methods.

We were first looking at the doc at
http://docs.scipy.org/doc/scipy-0.15.1/reference/generated/scipy.fftpack.diff.html
and code at
https://github.com/scipy/scipy/blob/v0.15.1/scipy/fftpack/pseudo_diffs.py#L26
as well as the tests at
https://github.com/scipy/scipy/blob/master/benchmarks/benchmarks/fftpack_pseudo_diffs.py

The direct_diff() test function in bench_pseudo_diffs.py seems odd, 
since I have to pass period=-1 to get a matching sign plot for the 
first integration.
And why the scaling differences required, even for diff() and direct_diff()?
Is my understanding fundamentally flawed?

Emacs!


  Ray 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/numpy-discussion/attachments/20150428/821d6519/attachment.html>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: 2a6b1958.jpg
Type: image/jpeg
Size: 362369 bytes
Desc: not available
URL: <http://mail.python.org/pipermail/numpy-discussion/attachments/20150428/821d6519/attachment.jpg>
-------------- next part --------------
#-----------------------------------------------------------------------------
# Name:        plotter.py
# Purpose:     
#
# Author:      Ray Schumacher
#
# Created:     2015/04/22
# RCS-ID:      $Id: plotter.py $
# Copyright:   (c) 2015
# Licence:     
#-----------------------------------------------------------------------------
"""

"""
import numpy
import time

#------------ stand-alone test --------------------------------------------------

def intf(a, fs, f_lo=0.0, f_hi=1.0e12, times=1, winlen=1, unwin=False):
    """
    Numerically integrate a time series in the frequency domain.

    This function integrates a time series in the frequency domain using
    'Omega Arithmetic', over a defined frequency band.

    Parameters
    ----------
    a : array_like
        Inumpyut time series.
    fs : int
        Sampling rate (Hz) of the inumpyut time series.
    f_lo : float, optional
        Lower frequency bound over which integration takes place.
        Defaults to 0 Hz.
    f_hi : float, optional
        Upper frequency bound over which integration takes place.
        Defaults to the Nyquist frequency ( = fs / 2).
    times : int, optional
        Number of times to integrate inumpyut time series a. Can be either 
        0, 1 or 2. If 0 is used, function effectively applies a 'brick wall' 
        frequency domain filter to a.
        Defaults to 1.
    winlen : int, optional
        Number of seconds at the beginning and end of a file to apply half a 
        Hanning window to. Limited to half the record length.
        Defaults to 1 second.
    unwin : Boolean, optional
        Whether or not to remove the window applied to the inumpyut time series
        from the output time series.

    Returns
    -------
    out : complex ndarray
        The zero-, single- or double-integrated acceleration time series.

    Versions
    ----------
    1.1 First development version. 
        Uses rfft to avoid complex return values.
        Checks for even length time series; if not, end-pad with single zero.
    1.2 Zero-means time series to avoid spurious errors when applying Hanning
        window.

    """
    t0 = time.clock()
    EPS = numpy.finfo(float).eps *2**8
    a = a - a.mean()                        # Convert time series to zero-mean
    if numpy.mod(a.size,2) != 0:               # Check for even length time series
        odd = True
        a = numpy.append(a, 0)                 # If not, append zero to array
    else:
        odd = False
    f_hi = min(fs/2, f_hi)                  # Upper frequency limited to Nyquist
    winlen = min(a.size/2, winlen)          # Limit window to half record length

    ni = a.size                             # No. of points in data (int) 
    nf = float(ni)                          # No. of points in data (float)
    fs = float(fs)                          # Sampling rate (Hz)
    df = fs/nf                              # Frequency increment in FFT
    stf_i = int(f_lo/df)                    # Index of lower frequency bound
    enf_i = int(f_hi/df)                    # Index of upper frequency bound

    window = numpy.ones(ni)                    # Create window function
    es = int(winlen*fs)                     # No. of samples to window from ends
    edge_win = numpy.hanning(es)               # Hanning window edge 
    window[:es/2] = edge_win[:es/2]
    window[-es/2:] = edge_win[-es/2:]
    a_w = a*window

    FFTspec_a = numpy.fft.rfft(a_w)            # Calculate complex FFT of inumpyut
    FFTfreq = numpy.fft.fftfreq(ni, d=1/fs)[:ni/2+1]

    w = (2*numpy.pi*FFTfreq)                   # Omega
    iw = (0+1j)*w                           # i*Omega

    mask = numpy.zeros(ni/2+1)                 # Half-length mask for +ve freqs
    mask[stf_i:enf_i] = 1.0                 # Mask = 1 for desired +ve freqs

    if times == 2:                          # Double integration
        FFTspec = -FFTspec_a*w / (w+EPS)**3
    elif times == 1:                        # Single integration
        FFTspec = FFTspec_a*iw / (iw+EPS)**2
    elif times == 0:                        # No integration
        FFTspec = FFTspec_a
    else:
        print 'Error'

    FFTspec *= mask                         # Select frequencies to use

    out_w = numpy.fft.irfft(FFTspec)           # Return to time domain

    if unwin == True:
        out = out_w*window/(window+EPS)**2  # Remove window from time series
    else:
        out = out_w
    print 'elapsed', time.clock()-t0
    if odd == True:                         # Check for even length time series
        return out[:-1]                     # If not, remove last entry
    else:        
        return out

def direct_diff(x,k=1,period=None):
    fx = numpy.fft.fft(x)
    n = len (fx)
    if period is None:
        period = 2*numpy.pi
    w = numpy.fft.fftfreq(n)*2j*numpy.pi/period*n
    if k<0:
        with numpy.errstate(divide="ignore", invalid="ignore"):
            w = 1 / w**k
        w[0] = 0.0
    else:
        w = w**k
    if n>2000:
        w[250:n-250] = 0.0
    return numpy.fft.ifft(w*fx).real

def dc_int(sine, sample_rate):
    # fourier transform
    ft = numpy.fft.rfft(sine)
    N = len(sine)

    # bin frequencies
    bin_width = float(sample_rate) / N
    bin_freqs = numpy.arange(N//2 + 1) * bin_width  # include extra frequency for Nyquist bin
    # OK, don't divide by zero, and the Nyquist bin actually has a negative frequency
    bin_freqs[0] = bin_freqs[1]
    bin_freqs[-1] *= -1

    # integrate!
    integ_ft = ft / (2j*numpy.pi*bin_freqs)
    integ_ft[0] = 0  # zero out the DC bin

    # inverse rfft
    return numpy.fft.irfft(integ_ft)

def int2(ys):
    N = ys.shape[0]
    w = (numpy.arange(N) - N /2.) / float(N)

    # integration
    Fys = numpy.fft.fft(ys)
    with numpy.errstate(divide="ignore", invalid="ignore"):
        modFys = numpy.fft.ifftshift(1./ (2 * numpy.pi * 1j * w) * numpy.fft.fftshift(Fys))

    # modFys[0] will hold the result of dividing the DC component of y by 0, so it
    # will be nan or inf.  Setting modFys[0] to 0 amounts to choosing a specific
    # constant of integration.
    modFys[0] = 0

    return numpy.fft.ifft(modFys).real / float(N)
    
def test(pth=r'C:\Users\Ray\Dropbox (Jan Medical)\SD Datasets\Normals\Other Subject Data\Data from JH Unit\001_2008Sep12_142457.csv'):
    """ run on a sample CSV file
        
        b: blue
        g: green
        r: red
        c: cyan
        m: magenta
        y: yellow
        k: black
        w: white
    """
    import os.path
    import matplotlib.pyplot as plt
    from scipy.fftpack import diff
    from mpl_toolkits.axes_grid1 import host_subplot
    import mpl_toolkits.axisartist as AA
    
    length = 2.**14
    
    fh=open(pth,'r')
    
    ## for this sample data
    sampleRate=1024

    x = numpy.arange(length)*2*numpy.pi/length
    f = numpy.sin(x)*numpy.cos(4*x)
    plt.plot(f, label="data")
    
    intg_data = diff(f,-1)
    plt.plot(intg_data-.01, label="scipy diff - .01")
    
    intg_data1 = direct_diff(f, -1, period=-1)
    adj0 = intg_data.max()/intg_data1.max()
    plt.plot(intg_data1*adj0, label="direct_diff, adj:"+str(adj0)) # diverges wildly...
    
    intg_data3 = dc_int(f, sampleRate)
    adjdc = intg_data.max()/intg_data3.max()
    intg_data3 *= adjdc 
    plt.plot(intg_data3+.01, label="int dc +.01, adj:"+str(adjdc))
    
    int2_res = int2(f)
    adj = intg_data.max()/int2_res.max()
    int2_res *= adj 
    plt.plot(int2_res, label="int2 adj:"+str(adj))
    
    plt.legend()
    plt.draw()
    plt.show()

    intg_data = diff(f,-2)
    plt.plot(intg_data-.01, label="scipy diff - .01")
    
    intg_data1 = direct_diff(f, -2, period=1)
    adj0 = intg_data.max()/intg_data1.max()
    plt.plot(intg_data1*adj0, label="direct_diff, adj:"+str(adj0)) # diverges wildly...
    
    intg_data3 = dc_int(dc_int(f, sampleRate), sampleRate)
    adjdc = intg_data.max()/intg_data3.max()
    intg_data3 *= adjdc 
    plt.plot(intg_data3+.01, label="int dc +.01, adj:"+str(adjdc))
    
    int2_res = int2(int2(f))
    adj = intg_data.max()/int2_res.max()
    int2_res *= adj 
    plt.plot(int2_res, label="int2 adj:"+str(adj))
    
    plt.legend()
    plt.draw()
    plt.show()
        
if __name__ == '__main__':
    import sys
    if len(sys.argv)>1:
        test(sys.argv[1])
    else:
        test()


More information about the NumPy-Discussion mailing list