chriotte
chriotte

Reputation: 39

Spectral analysis on HRV data with LombScargle in Python

I'm working with RR peaks and want to derive the frequency domain measures for HRV to recreate the results from the native C package by Physionet (WFDB tools). Both signal processing and spectral analysis are new fields for me, but after a long week with trial an error I've hacked together some code based on the Astropy module after trying several other solutions.

from astropy.stats import LombScargle
import random
dy = 0.1 * random.randint(1,100)
t = drive01["time"].values
y = drive01["intervals"].values
frequency, power = LombScargle(t, y,dy).autopower(minimum_frequency=0.0,maximum_frequency=4)

plt.plot(frequency, power)  

This creates a plot that looks quite similar to the plot from Physionets package. Matplot lib

Physionets HRV tools with the code get_hrv makes this plot enter image description here

Then by calculating common frequency domain measures I get quite different results.

Pxx = np.nan_to_num(power)
Fxx = np.nan_to_num(frequency)

ulf = 0.003
vlf = 0.04
lf = 0.15
hf = 0.4
Fs = 15.5 # the sampling rate of the drive file
# find the indexes corresponding to the VLF, LF, and HF bands
vlf_freq_band = (Fxx >= ulf) & (Fxx <= vlf)
lf_freq_band = (Fxx >= vlf) & (Fxx <= lf)
hf_freq_band = (Fxx >= lf) & (Fxx <= hf)
tp_freq_band = (Fxx >= 0) & (Fxx <= hf)
# Calculate the area under the given frequency band
dy = 1.0 / Fs
VLF = np.trapz(y=abs(Pxx[vlf_freq_band]), x=None, dx=dy)
LF = np.trapz(y=abs(Pxx[lf_freq_band]), x=None, dx=dy)
HF = np.trapz(y=abs(Pxx[hf_freq_band]), x=None, dx=dy)
TP = np.trapz(y=abs(Pxx[tp_freq_band]), x=None, dx=dy)
LF_HF = float(LF) / HF

Python

 'HF': 0.10918703853414605,
 'LF': 0.050074418080717789,
 'LF/HF': 0.45861137689028925,
 'TP': 0.20150514290250854,
 'VLF': 0.025953350304821571

From the Physionet package:

TOT PWR  = 0.0185973
VLF PWR  = 0.00372733
LF PWR   = 0.00472635
HF PWR   = 0.0101436
LF/HF    = 0.465944

When comparing the results it looks like this:

    Python  Physionet   
TP  0.201505143 0.0185973   Quite similar  + decimal dif
HF  0.109187039 0.0101436   Quite similar  + decimal dif
LF  0.050074418 0.00472635  Quite similar  + decimal dif
VLF 0.02595335  0.00372733  Not similar
LF/HF   0.458611377 0.465944    Quite similar

The calculations in Python are based on the code from another Stackoverflow post but the fix he got from the respondent is based on a python module I'm not able to get working and he is not using the Lomb Periodgram. I'm very open for trying something else as well, as long as its working with uneven samples. the data I'm working with is the drivedb from Physionet and I've used the Physionet packages to make a text file with RR peaks and time which is read into a Pandas DataFrame. The textfile can be found here

Upvotes: 1

Views: 1060

Answers (1)

Xue
Xue

Reputation: 11

LombScargle based on the Astropy caculate power different with C package by Physionet (WFDB tools). I write lombscargle again in python and result the same with C package by Physionet (WFDB tools).

import numpy as np
    import os
    import math
    import csv
    from itertools import zip_longest
    import time


    DATA_PATH = '/home/quangpc/Desktop/Data/PhysionetData/mitdb/'


    class FreqDomainClass:

        @staticmethod
        def power(freq, mag):
            lo = [0, 0.0033, 0.04, 0.15]
            hi = [0.0033, 0.04, 0.15, 0.4]
            pr = np.zeros(4)
            nbands = 4
            for index in range(0, len(freq)):
                pwr = np.power(mag[index], 2)
                for n in range(0, nbands):
                    if (freq[index] >= lo[n]) and freq[index] <= hi[n]:
                        pr[n] += pwr
                        break
            return pr

        @staticmethod
        def avevar(y):
            var = 0.0
            ep = 0.0
            ave = np.mean(y)
            for i in range(len(y)):
                s = y[i] - ave
                ep += s
                var += s * s
            var = (var - ep * ep / len(y)) / (len(y) - 1)
            return var

        def lomb(self, t, h, ofac, hifac):
            period = max(t) - min(t)
            z = h - np.mean(h)
            f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
            f = f[:int(len(f) / 2) + 1]
            f = np.reshape(f, (len(f), -1))

            w = 2 * np.pi * f
            lenght_t = len(t)
            t = np.reshape(t, (lenght_t, -1))
            t = np.transpose(t)
            tau = np.arctan2(np.sum(np.sin(2 * w * t), axis=1), np.sum(np.cos(2 * w * t), axis=1)) / (2 * w)
            tau = np.diag(tau)
            tau = np.reshape(tau, (len(tau), -1))
            tau = np.tile(tau, (1, lenght_t))
            cos = np.cos(w * (t - tau))
            sin = np.sin(w * (t - tau))
            pc = np.power(np.sum(z * cos, axis=1), 2)
            ps = np.power(np.sum(z * sin, axis=1), 2)
            cs = pc / np.sum(np.power(cos, 2), axis=1)
            ss = ps / np.sum(np.power(sin, 2), axis=1)
            p = cs + ss

            pwr = self.avevar(h)
            nout = len(h)
            p = p / (2 * pwr)
            p = p / (nout / (2.0 * pwr))

            return f, np.sqrt(p)

        def lomb_for(self, t, h, ofac, hifac):
            period = max(t) - min(t)
            f = np.arange(1 / (period * ofac), hifac * len(h) / (2 * period), 1 / (period * ofac))
            f = f[:int(len(f) / 2) + 1]
            z = h - np.mean(h)
            p = np.zeros(len(f))
            for i in range(len(f)):
                w = 2 * np.pi * f[i]
                if w > 0:
                    twt = 2 * w * t
                    y = sum(np.sin(twt))
                    x = sum(np.cos(twt))
                    tau = math.atan2(y, x) / (2 * w)
                    wtmt = w * (t - tau)
                    cs = np.power(sum(np.multiply(z, np.cos(wtmt))), 2) / sum(np.power((np.cos(wtmt)), 2))
                    ss = np.power(sum(np.multiply(z, np.sin(wtmt))), 2) / sum(np.power((np.sin(wtmt)), 2))
                    p[i] = cs + ss
                else:
                    p[i] = np.power(sum(np.multiply(z, t)), 1) / sum(np.power(t), 1)

            pwr = self.avevar(h)
            nout = len(h)
            p = p / (2 * pwr)
            p = p / (nout / (2.0 * pwr))

            return f, np.sqrt(p)

        def freq_domain(self, time, rr_intervals):
            frequency, mag0 = self.lomb(time, rr_intervals, 4.0, 2.0)
            frequency = np.round(frequency, 8)
            mag0 = mag0 / 2.0
            mag0 = np.round(mag0, 8)
            result = self.power(frequency, mag0)
            return result[0], result[1], result[2], result[3], result[0] + result[1] + result[2] + result[3], \
                   result[2] / result[3]


    def time_domain(time, rr_intervals, ann):
        sum_rr = 0.0
        sum_rr2 = 0.0
        rmssd = 0.0
        totnn = 0
        totnnn = 0
        nrr = 1
        totrr = 1
        nnx = 0
        nnn = 0
        lastann = ann[0]
        lastrr = int(rr_intervals[0])
        lenght = 300
        t = float(time[0])
        end = t + lenght
        i = 0
        ratbuf = np.zeros(2400)
        avbuf = np.zeros(2400)
        sdbuf = np.zeros(2400)
        for x in range(1, len(ann)):
            t = float(time[x])
            while t > (end+lenght):
                i += 1
                end += lenght
            if t >= end:
                if nnn > 1:
                    ratbuf[i] = nnn/nrr
                    sdbuf[i] = np.sqrt(((sdbuf[i] - avbuf[i]*avbuf[i]/nnn) / (nnn-1)))
                    avbuf[i] /= nnn
                i += 1
                nnn = nrr = 0
                end += lenght
            nrr += 1
            totrr += 1
            if ann[x] == 'N' and ann[x-1] == 'N':
                rr_intervals[x] = int(rr_intervals[x])
                nnn += 1
                avbuf[i] += rr_intervals[x]
                sdbuf[i] += (rr_intervals[x] * rr_intervals[x])
                sum_rr += rr_intervals[x]
                sum_rr2 += (rr_intervals[x] * rr_intervals[x])
                totnn += 1
                if lastann == 'N':
                    totnnn += 1
                    rmssd += (rr_intervals[x] - lastrr) * (rr_intervals[x] - lastrr)
                    #  nndif[0] = NNDIF
                    if abs(rr_intervals[x] - lastrr) - 0.05 > (10 ** -10):
                        nnx += 1
            lastann = ann[x-1]
            lastrr = rr_intervals[x]
        if totnn == 0:
            return 0, 0, 0, 0
        sdnn = np.sqrt((sum_rr2 - sum_rr * sum_rr / totnn) / (totnn - 1))
        rmssd = np.sqrt(rmssd/totnnn)
        pnn50 = nnx / totnnn
        if nnn > 1:
            ratbuf[i] = nnn / nrr
            sdbuf[i] = np.sqrt((sdbuf[i] - avbuf[i] * avbuf[i] / nnn) / (nnn - 1))
            avbuf[i] /= nnn
        nb = i + 1
        sum_rr = 0.0
        sum_rr2 = 0.0
        k = 0
        h = 0
        while k < nb:
            if ratbuf[k] != 0:
                h += 1
                sum_rr += avbuf[k]
                sum_rr2 += (avbuf[k] * avbuf[k])
            k += 1
        sdann = np.sqrt((sum_rr2 - sum_rr * sum_rr / h) / (h - 1))
        return sdnn, sdann, rmssd, pnn50


    def get_result_from_get_hrv(filename):
        with open(filename, 'r') as f:
            csv_reader = csv.reader(f, delimiter=',')
            index = 0
            for row in csv_reader:
                if index > 0:
                    output = [s.strip() for s in row[0].split('=') if s]
                    # print('output = ', output)
                    if output[0] == 'SDNN':
                        sdnn = output[1]
                    if output[0] == 'SDANN':
                        sdann = output[1]
                    if output[0] == 'rMSSD':
                        rmssd = output[1]
                    if output[0] == 'pNN50':
                        pnn50 = output[1]
                    if output[0] == 'ULF PWR':
                        ulf = output[1]
                    if output[0] == 'VLF PWR':
                        vlf = output[1]
                    if output[0] == 'LF PWR':
                        lf = output[1]
                    if output[0] == 'HF PWR':
                        hf = output[1]
                    if output[0] == 'TOT PWR':
                        tp = output[1]
                    if output[0] == 'LF/HF':
                        ratio_lf_hf = output[1]

                index += 1

        return float(sdnn), float(sdann), float(rmssd), float(pnn50), float(ulf), float(vlf), \
               float(lf), float(hf), float(tp), float(ratio_lf_hf)


    def save_file():
        extension = "atr"

        result_all = []
        file_process = ['File']
        sdnn_l = ['sdnn']
        sdann_l = ['sdann']
        rmssd_l = ['rmssd']
        pnn50_l = ['pnn50']
        ulf_l = ['ulf']
        vlf_l = ['vlf']
        lf_l = ['lf']
        hf_l = ['hf']
        tp_l = ['tp']
        ratio_lf_hf_l = ['ratio_lf_hf']

        sdnn_l_p = ['sdnn']
        sdann_l_p = ['sdann']
        rmssd_l_p = ['rmssd']
        pnn50_l_p = ['pnn50']
        ulf_l_p = ['ulf']
        vlf_l_p = ['vlf']
        lf_l_p = ['lf']
        hf_l_p = ['hf']
        tp_l_p = ['tp']
        ratio_lf_hf_l_p = ['ratio_lf_hf']

        test_file = ['103',  '113', '117', '121', '123', '200', '202', '210', '212', '213',
                     '219', '221', '213', '228', '231', '233', '234',
                     '101', '106', '108',  '112', '114', '115', '116', '119', '122', '201', '203',
                     '205', '208', '209', '215', '220', '223', '230',
                     '105', '100']
        file_dis = ['109', '111', '118', '124', '207', '214', '232']
        for root, dirs, files in os.walk(DATA_PATH):
            files = np.sort(files)

        for name in files:
            if extension in name:
                if os.path.basename(name[:-4]) not in test_file:
                    continue
                print('Processing file...', os.path.basename(name))

                cur_dir = os.getcwd()
                os.chdir(DATA_PATH)
                os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))

                time_m = []
                rr_intervals = []
                ann = []
                with open(name.split('.')[0] + '.rr', 'r') as rr_file:
                    for line in rr_file:
                        time_m.append(line.split(' ')[0])
                        rr_intervals.append(line.split(' ')[1])
                        ann.append(line.split(' ')[2].split('\n')[0])

                time_m = np.asarray(time_m, dtype=float)
                rr_intervals = np.asarray(rr_intervals, dtype=float)

                sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)

                if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
                    print('No result hrv')
                    file_dis.append(os.path.basename(name[:-4]))
                    continue

                print('sdnn', sdnn)
                print('rmssd', rmssd)
                print('pnn50', pnn50)
                print('sdann', sdann)

                time_m = time_m - time_m[0]
                time_m = np.round(time_m, 3)
                time_nn = []
                nn_intervals = []
                for i in range(1, len(ann)):
                    if ann[i] == 'N' and ann[i - 1] == 'N':
                        nn_intervals.append(rr_intervals[i])
                        time_nn.append(time_m[i])
                time_nn = np.asarray(time_nn, dtype=float)
                nn_intervals = np.asarray(nn_intervals, dtype=float)

                fc = FreqDomainClass()
                ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)

                sdnn_l.append(sdnn)
                sdann_l.append(sdann)
                rmssd_l.append(rmssd)
                pnn50_l.append(pnn50)
                ulf_l.append(ulf)
                vlf_l.append(vlf)
                lf_l.append(lf)
                hf_l.append(hf)
                tp_l.append(tp)
                ratio_lf_hf_l.append(ratio_lf_hf)

                print('ULF PWR: ', ulf)
                print('VLF PWR: ', vlf)
                print('LF PWR: ', lf)
                print('HF PWR: ', hf)
                print('TOT PWR: ', tp)
                print('LF/HF: ', ratio_lf_hf)

                if os.path.exists('physionet_hrv.txt'):
                    os.remove('physionet_hrv.txt')
                os.system('get_hrv -R ' + name.split('.')[0] + '.rr >> ' + 'physionet_hrv.txt')
                sdnn, sdann, rmssd, pnn50, ulf, vlf, lf, hf, tp, ratio_lf_hf = \
                    get_result_from_get_hrv('physionet_hrv.txt')

                file_process.append(os.path.basename(name[:-4]))
                sdnn_l_p.append(sdnn)
                sdann_l_p.append(sdann)
                rmssd_l_p.append(rmssd)
                pnn50_l_p.append(pnn50)
                ulf_l_p.append(ulf)
                vlf_l_p.append(vlf)
                lf_l_p.append(lf)
                hf_l_p.append(hf)
                tp_l_p.append(tp)
                ratio_lf_hf_l_p.append(ratio_lf_hf)

                os.chdir(cur_dir)

        result_all.append(file_process)
        result_all.append(sdnn_l)
        result_all.append(sdnn_l_p)
        result_all.append(sdann_l)
        result_all.append(sdann_l_p)
        result_all.append(rmssd_l)
        result_all.append(rmssd_l_p)
        result_all.append(pnn50_l)
        result_all.append(pnn50_l_p)
        result_all.append(ulf_l)
        result_all.append(ulf_l_p)
        result_all.append(vlf_l)
        result_all.append(vlf_l_p)
        result_all.append(lf_l)
        result_all.append(lf_l_p)
        result_all.append(hf_l)
        result_all.append(hf_l_p)
        result_all.append(tp_l)
        result_all.append(tp_l_p)
        result_all.append(ratio_lf_hf_l)
        result_all.append(ratio_lf_hf_l_p)
        print(file_dis)
        with open('hrv2.csv', 'w+') as f:
            writer = csv.writer(f)
            for values in zip_longest(*result_all):
                writer.writerow(values)


    def main():
        extension = "atr"
        for root, dirs, files in os.walk(DATA_PATH):
            files = np.sort(files)
            for name in files:
                if extension in name:
                    if name not in ['101.atr']:
                        continue
                    cur_dir = os.getcwd()
                    os.chdir(DATA_PATH)
                    os.system('rrlist {} {} -M -s >{}.rr'.format(extension, name.split('.')[0], name.split('.')[0]))
                    time_m = []
                    rr_intervals = []
                    ann = []
                    with open(name.split('.')[0] + '.rr', 'r') as rr_file:
                        for line in rr_file:
                            time_m.append(line.split(' ')[0])
                            rr_intervals.append(line.split(' ')[1])
                            ann.append(line.split(' ')[2].split('\n')[0])
                    time_m = np.asarray(time_m, dtype=float)
                    rr_intervals = np.asarray(rr_intervals, dtype=float)
                    sdnn, sdann, rmssd, pnn50 = time_domain(time_m, rr_intervals, ann)
                    if sdnn == 0 and sdann == 0 and rmssd == 0 and pnn50 == 0:
                        print('No result hrv')
                        return 0
                    print('sdnn', sdnn)
                    print('rmssd', rmssd)
                    print('pnn50', pnn50)
                    print('sdann', sdann)

                    time_m = time_m - time_m[0]
                    time_m = np.round(time_m, 3)
                    time_nn = []
                    nn_intervals = []
                    for i in range(1, len(ann)):
                        if ann[i] == 'N' and ann[i - 1] == 'N':
                            nn_intervals.append(rr_intervals[i])
                            time_nn.append(time_m[i])
                    time_nn = np.asarray(time_nn, dtype=float)
                    nn_intervals = np.asarray(nn_intervals, dtype=float)

                    start = time.time()
                    fc = FreqDomainClass()
                    ulf, vlf, lf, hf, tp, ratio_lf_hf = fc.freq_domain(time_nn, nn_intervals)
                    end = time.time()

                    print('ULF PWR: ', ulf)
                    print('VLF PWR: ', vlf)
                    print('LF PWR: ', lf)
                    print('HF PWR: ', hf)
                    print('TOT PWR: ', tp)
                    print('LF/HF: ', ratio_lf_hf)
                    print('finish', end - start)
                    os.chdir(cur_dir)

Upvotes: 1

Related Questions