Nukolas
Nukolas

Reputation: 495

Modifying the scipy.signal.welch method to reject certain spectra before averaging

I am trying to isolate the spectral characteristics of background seismic noise from a time-series which contains both the background noise and signal from several transient events (e.g. earthquakes).

To do this I am using the scipy.signal.welch method, which essentially chops up my time series into smaller segments, calculated the fourier transform for each segment and then averages the result and returns the "welch spectrum".

What I want to do is to modify scipy.signal.welch so that the spectra for segments which are contaminated by the transient signals are rejected from the averaging process, using some criteria which allows recognition of the contaminated spectra (e.g. and RMS amplitude threshold of the spectra or maybe even the time series segment - to be decided).

I have the relevant source for the scipy.signal.welch method and I have isolated the relevant code down to the "spectral_helper" and "_fft_helper" functions, but I can't identify where the averaging of the spectra takes place, or where is the best place to implement my spectra rejection operation.

I am not quite skilled enough to fully understand the scipy code and modify it for my purposes, can someone help me to work out the best way to do this?

Here is the function which validates and formats the type of spectra to be calculated, and calls the function which does the fft:

def _spectral_helper(x, y, fs=1.0, window='hann', nperseg=256,
                noverlap=None, nfft=None, detrend='constant',
                return_onesided=True, scaling='spectrum', axis=-1,
                mode='psd'):
"""
Calculate various forms of windowed FFTs for PSD, CSD, etc.
This is a helper function that implements the commonality between the
psd, csd, and spectrogram functions. It is not designed to be called
externally. The windows are not averaged over; the result from each window
is returned.
Parameters
---------
x : array_like
    Array or sequence containing the data to be analyzed.
y : array_like
    Array or sequence containing the data to be analyzed. If this is
    the same object in memoery as x (i.e. _spectral_helper(x, x, ...)),
    the extra computations are spared.
fs : float, optional
    Sampling frequency of the time series. Defaults to 1.0.
window : str or tuple or array_like, optional
    Desired window to use. See `get_window` for a list of windows and
    required parameters. If `window` is array_like it will be used
    directly as the window and its length will be used for nperseg.
    Defaults to 'hann'.
nperseg : int, optional
    Length of each segment.  Defaults to 256.
noverlap : int, optional
    Number of points to overlap between segments. If None,
    ``noverlap = nperseg // 2``.  Defaults to None.
nfft : int, optional
    Length of the FFT used, if a zero padded FFT is desired.  If None,
    the FFT length is `nperseg`. Defaults to None.
detrend : str or function or False, optional
    Specifies how to detrend each segment. If `detrend` is a string,
    it is passed as the ``type`` argument to `detrend`.  If it is a
    function, it takes a segment and returns a detrended segment.
    If `detrend` is False, no detrending is done.  Defaults to 'constant'.
return_onesided : bool, optional
    If True, return a one-sided spectrum for real data. If False return
    a two-sided spectrum. Note that for complex data, a two-sided
    spectrum is always returned.
scaling : { 'density', 'spectrum' }, optional
    Selects between computing the cross spectral density ('density')
    where `Pxy` has units of V**2/Hz and computing the cross spectrum
    ('spectrum') where `Pxy` has units of V**2, if `x` and `y` are
    measured in V and fs is measured in Hz.  Defaults to 'density'
axis : int, optional
    Axis along which the periodogram is computed; the default is over
    the last axis (i.e. ``axis=-1``).
mode : str, optional
    Defines what kind of return values are expected. Options are ['psd',
    'complex', 'magnitude', 'angle', 'phase'].
Returns
-------
freqs : ndarray
    Array of sample frequencies.
t : ndarray
    Array of times corresponding to each data segment
result : ndarray
    Array of output data, contents dependant on *mode* kwarg.
References
----------
.. [1] Stack Overflow, "Rolling window for 1D arrays in Numpy?",
    http://stackoverflow.com/a/6811241
.. [2] Stack Overflow, "Using strides for an efficient moving average
    filter", http://stackoverflow.com/a/4947453
Notes
-----
Adapted from matplotlib.mlab
.. versionadded:: 0.16.0
"""
if mode not in ['psd', 'complex', 'magnitude', 'angle', 'phase']:
    raise ValueError("Unknown value for mode %s, must be one of: "
                     "'default', 'psd', 'complex', "
                     "'magnitude', 'angle', 'phase'" % mode)

# If x and y are the same object we can save ourselves some computation.
same_data = y is x

if not same_data and mode != 'psd':
    raise ValueError("x and y must be equal if mode is not 'psd'")

axis = int(axis)

# Ensure we have np.arrays, get outdtype
x = np.asarray(x)
if not same_data:
    y = np.asarray(y)
    outdtype = np.result_type(x,y,np.complex64)
else:
    outdtype = np.result_type(x,np.complex64)

if not same_data:
    # Check if we can broadcast the outer axes together
    xouter = list(x.shape)
    youter = list(y.shape)
    xouter.pop(axis)
    youter.pop(axis)
    try:
        outershape = np.broadcast(np.empty(xouter), np.empty(youter)).shape
    except ValueError:
        raise ValueError('x and y cannot be broadcast together.')

if same_data:
    if x.size == 0:
        return np.empty(x.shape), np.empty(x.shape), np.empty(x.shape)
else:
    if x.size == 0 or y.size == 0:
        outshape = outershape + (min([x.shape[axis], y.shape[axis]]),)
        emptyout = np.rollaxis(np.empty(outshape), -1, axis)
        return emptyout, emptyout, emptyout

if x.ndim > 1:
    if axis != -1:
        x = np.rollaxis(x, axis, len(x.shape))
        if not same_data and y.ndim > 1:
            y = np.rollaxis(y, axis, len(y.shape))

# Check if x and y are the same length, zero-pad if neccesary
if not same_data:
    if x.shape[-1] != y.shape[-1]:
        if x.shape[-1] < y.shape[-1]:
            pad_shape = list(x.shape)
            pad_shape[-1] = y.shape[-1] - x.shape[-1]
            x = np.concatenate((x, np.zeros(pad_shape)), -1)
        else:
            pad_shape = list(y.shape)
            pad_shape[-1] = x.shape[-1] - y.shape[-1]
            y = np.concatenate((y, np.zeros(pad_shape)), -1)

# X and Y are same length now, can test nperseg with either
if x.shape[-1] < nperseg:
    warnings.warn('nperseg = {0:d}, is greater than input length = {1:d}, '
                  'using nperseg = {1:d}'.format(nperseg, x.shape[-1]))
    nperseg = x.shape[-1]

nperseg = int(nperseg)
if nperseg < 1:
    raise ValueError('nperseg must be a positive integer')

if nfft is None:
    nfft = nperseg
elif nfft < nperseg:
    raise ValueError('nfft must be greater than or equal to nperseg.')
else:
    nfft = int(nfft)

if noverlap is None:
    noverlap = nperseg//2
elif noverlap >= nperseg:
    raise ValueError('noverlap must be less than nperseg.')
else:
    noverlap = int(noverlap)

# Handle detrending and window functions
if not detrend:
    def detrend_func(d):
        return d
elif not hasattr(detrend, '__call__'):
    def detrend_func(d):
        return signaltools.detrend(d, type=detrend, axis=-1)
elif axis != -1:
    # Wrap this function so that it receives a shape that it could
    # reasonably expect to receive.
    def detrend_func(d):
        d = np.rollaxis(d, -1, axis)
        d = detrend(d)
        return np.rollaxis(d, axis, len(d.shape))
else:
    detrend_func = detrend

if isinstance(window, string_types) or type(window) is tuple:
    win = get_window(window, nperseg)
else:
    win = np.asarray(window)
    if len(win.shape) != 1:
        raise ValueError('window must be 1-D')
    if win.shape[0] != nperseg:
        raise ValueError('window must have length of nperseg')

if np.result_type(win,np.complex64) != outdtype:
    win = win.astype(outdtype)

if mode == 'psd':
    if scaling == 'density':
        scale = 1.0 / (fs * (win*win).sum())
    elif scaling == 'spectrum':
        scale = 1.0 / win.sum()**2
    else:
        raise ValueError('Unknown scaling: %r' % scaling)
else:
    scale = 1

if return_onesided is True:
    if np.iscomplexobj(x):
        sides = 'twosided'
    else:
        sides = 'onesided'
        if not same_data:
            if np.iscomplexobj(y):
                sides = 'twosided'
else:
    sides = 'twosided'

if sides == 'twosided':
    num_freqs = nfft
elif sides == 'onesided':
    if nfft % 2:
        num_freqs = (nfft + 1)//2
    else:
        num_freqs = nfft//2 + 1

# Perform the windowed FFTs
result = _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft)
result = result[..., :num_freqs]
freqs = fftpack.fftfreq(nfft, 1/fs)[:num_freqs]

if not same_data:
    # All the same operations on the y data
    result_y = _fft_helper(y, win, detrend_func, nperseg, noverlap, nfft)
    result_y = result_y[..., :num_freqs]
    result = np.conjugate(result) * result_y
elif mode == 'psd':
    result = np.conjugate(result) * result
elif mode == 'magnitude':
    result = np.absolute(result)
elif mode == 'angle' or mode == 'phase':
    result = np.angle(result)
elif mode == 'complex':
    pass

result *= scale
if sides == 'onesided':
    if nfft % 2:
        result[...,1:] *= 2
    else:
        # Last point is unpaired Nyquist freq point, don't double
        result[...,1:-1] *= 2

t = np.arange(nperseg/2, x.shape[-1] - nperseg/2 + 1, nperseg - noverlap)/float(fs)

if sides != 'twosided' and not nfft % 2:
    # get the last value correctly, it is negative otherwise
    freqs[-1] *= -1

# we unwrap the phase here to handle the onesided vs. twosided case
if mode == 'phase':
    result = np.unwrap(result, axis=-1)

result = result.astype(outdtype)

# All imaginary parts are zero anyways
if same_data and mode != 'complex':
    result = result.real

# Output is going to have new last axis for window index
if axis != -1:
    # Specify as positive axis index
    if axis < 0:
        axis = len(result.shape)-1-axis

    # Roll frequency axis back to axis where the data came from
    result = np.rollaxis(result, -1, axis)
else:
    # Make sure window/time index is last axis
    result = np.rollaxis(result, -1, -2)

return freqs, t, result

Here is the function which does the actual fft:

def _fft_helper(x, win, detrend_func, nperseg, noverlap, nfft):
"""
Calculate windowed FFT, for internal use by scipy.signal._spectral_helper
This is a helper function that does the main FFT calculation for
_spectral helper. All input valdiation is performed there, and the data
axis is assumed to be the last axis of x. It is not designed to be called
externally. The windows are not averaged over; the result from each window
is returned.
Returns
-------
result : ndarray
    Array of FFT data
References
----------
.. [1] Stack Overflow, "Repeat NumPy array without replicating data?",
    http://stackoverflow.com/a/5568169
Notes
-----
Adapted from matplotlib.mlab
.. versionadded:: 0.16.0
"""
# Created strided array of data segments
if nperseg == 1 and noverlap == 0:
    result = x[..., np.newaxis]
else:
    step = nperseg - noverlap
    shape = x.shape[:-1]+((x.shape[-1]-noverlap)//step, nperseg)
    strides = x.strides[:-1]+(step*x.strides[-1], x.strides[-1])
    result = np.lib.stride_tricks.as_strided(x, shape=shape,
                                             strides=strides)

# Detrend each data segment individually
result = detrend_func(result)

# Apply window by multiplication
result = win * result

# Perform the fft. Acts on last axis by default. Zero-pads automatically
result = fftpack.fft(result, n=nfft)
np.savetxt('result.csv', np.absolute(result), delimiter=',')


return result

Upvotes: 1

Views: 5811

Answers (1)

SleuthEye
SleuthEye

Reputation: 14577

I have the relevant source for the scipy.signal.welch method and I have isolated the relevant code down to the _spectral_helper and _fft_helper functions, but I can't identify where the averaging of the spectra takes place, or where is the best place to implement my spectra rejection operation.

Perhaps the reason you cannot find where the averaging is done in _spectral_helper and _fft_helper is because this is not where it's done, as indicated with the function documentation included in both functions:

The windows are not averaged over; the result from each window
is returned.

Instead, the averaging is performed on the result from _spectral_helper in the function csd (called from welch) with:

# Average over windows.
if len(Pxy.shape) >= 2 and Pxy.size > 0:
    if Pxy.shape[-1] > 1:
        Pxy = Pxy.mean(axis=-1)
    else:
        Pxy = np.reshape(Pxy, Pxy.shape[:-1])

Note that instead of modifying welch, you could use spectrogram, which does not do the averaging and thus leaves you the option to reject spectra as you please before you perform the averaging.

Upvotes: 3

Related Questions