Ahmed K. Moustafa
Ahmed K. Moustafa

Reputation: 113

load MIT-BIH Normal Sinus Rhythm Database in python

I am trying to load MIT-BIH Normal Sinus Rhythm Database (nsrdb) in python. I checked this tutorial:

https://medium.com/@roszcz/machine-learning-for-medicine-qrs-detection-in-a-single-channel-ecg-signal-part-1-data-set-be36f70bbd38

However it is for loading MIT-BIH Arrhythmia database. The following code from mitdb.py file in datasets folder is showed below. How to adjust the get_records() function to download other physionet databases as well like the nsrdb ?

Link of Physionet database: https://www.physionet.org/physiobank/database/nsrdb/

import os
import h5py
import wfdb as wf
import numpy as np
import pandas as pd
from glob import glob
from scipy import signal as ss
from utils import download as ud
from matplotlib import pyplot as plt

def get_records():
    """ Get paths for data in data/mit/ directory """
    # Download if doesn't exist
    if not os.path.isdir('data/mitdb'):
        print 'Downloading the mitdb ecg database, please wait'
        ud.download_mitdb()
        print 'Download finished'

    # There are 3 files for each record
    # *.atr is one of them
    paths = glob('data/mitdb/*.atr')

    # Get rid of the extension
    paths = [path[:-4] for path in paths]
    paths.sort()

    return paths

def good_types():
    """ Of annotations """
    # www.physionet.org/physiobank/annotations.shtml
    good = ['N', 'L', 'R', 'B', 'A',
            'a', 'J', 'S', 'V', 'r',
            'F', 'e', 'j', 'n', 'E',
            '/', 'f', 'Q', '?']

    return good

def beat_annotations(annotation):
    """ Get rid of non-beat markers """
    # Declare beat types
    good = good_types()
    ids = np.in1d(annotation.anntype, good)

    # We want to know only the positions
    beats = annotation.annsamp[ids]

    return beats

def convert_input(channel, annotation):
    """ Into output """
    # Remove non-beat annotations
    beats = beat_annotations(annotation)

    # Create dirac-comb signal
    dirac = np.zeros_like(channel)
    dirac[beats] = 1.0

    # Use hamming window as a bell-curve filter
    width = 36
    filter = ss.hamming(width)
    gauss = np.convolve(filter, dirac, mode = 'same')

    return dirac, gauss

def good_annotations():
    """ Const function with good annotations """
    # For now it seems those are most popular
    good_annotations = [1, 2, 3, 4,
                        5, 6, 7, 8,
                        9, 10, 11, 12,
                        13, 16, 31, 38]

    return good_annotations

def make_dataset(records, width, savepath):
    """ Inside an array """
    # Prepare containers
    signals, labels = [], []

    # Iterate files
    for path in records:
        print 'Processing file:', path
        record = wf.rdsamp(path)
        annotations = wf.rdann(path, 'atr')

        # Extract pure signals
        data = record.p_signals

        # Convert each channel into labeled fragments
        signal, label = convert_data(data, annotations, width)

        # Cumulate
        signals.append(signal)
        labels.append(label)

    # Convert to one huge numpy.array
    signals = np.vstack(signals)
    labels = np.vstack(labels)

    # Write to disk
    np.save(savepath, {'signals' : signals,
                       'labels'  : labels })

def convert_data(data, annotations, width):
    """ Into a batch """
    # Prepare containers
    signals, labels = [], []

    # Convert both channels
    for it in range(2):
        channel = data[:, it]
        dirac, gauss = convert_input(channel,
                                     annotations)
        # Merge labels
        label = np.vstack([dirac, gauss])

        # Prepare the moving window
        sta = 0
        end = width
        stride = width
        while end <= len(channel):
            # Chop out the fragments
            s_frag = channel[sta : end]
            l_frag = label[:, sta : end]

            # Cumulate
            signals.append(s_frag)
            labels.append(l_frag)

            # Go forth
            sta += stride
            end += stride

    # Turn into arrays
    signals = np.array(signals)
    labels = np.array(labels)

    return signals, labels

def create_datasets():
    """ Training, validation, test """
    # Prepare paths
    records = get_records()

    # Shuffle up determinitically
    np.random.seed(666)
    np.random.shuffle(records)

    # Define the data
    width = 200

    # Make training
    make_dataset(records[:30], width, 'data/training')

    # ... validation ...
    make_dataset(records[30 : 39], width, 'data/validation')

    # ... and test
    make_dataset(records[39 : 48], width, 'data/test')

Upvotes: 2

Views: 3992

Answers (2)

Shreyas Murali
Shreyas Murali

Reputation: 436

The function ud.download_mitdb()'s source code here

Excerpt: (disclaimer: I couldn't find a license attached in the git repo. I am assuming the original author is ok with it being quoted here for reference. if not please remove the below code)

import os
import urllib2
import requests
from tqdm import tqdm
from bs4 import BeautifulSoup as BSoup

def download_mitdb():
    """ All """
    extensions = ['atr', 'dat', 'hea']
    the_path = 'https://www.physionet.org/physiobank/database/mitdb/'

    # Save to proper data/ directory
    savedir = 'data/mitdb'
    if not os.path.exists(savedir):
        os.makedirs(savedir)

    # With this format
    savename = savedir + '/{}.{}'

    # Find all interesting files on that site:
    soup = BSoup(urllib2.urlopen(the_path).read())

    # Find all links pointing to .dat files
    hrefs = []
    for a in soup.find_all('a', href=True):
        href = a['href']
        # Download datafiles with markers given
        if href[-4::] == '.dat':
            hrefs.append(href[:-4])

    # Path to the file on the internet
    down_path = the_path + '{}.{}'

    for data_id in hrefs:
        for ext in extensions:
            webpath = down_path.format(data_id, ext)
            datafile = urllib2.urlopen(webpath)

            # Save locally
            filepath = savename.format(data_id, ext)
            with open(filepath, 'wb') as out:
                out.write(datafile.read())

    print 'Downloaded {} data files'.format(len(hrefs))

def download_qt():
    """ All """
    extensions = ['atr', 'dat', 'hea',
                  'man', 'q1c', 'q2c',
                  'qt1', 'qt2', 'pu', 'pu0', 'pu1']
    the_path = 'https://www.physionet.org/physiobank/database/qtdb/'

    # Save to proper data/ directory
    savedir = 'data/qt'
    if not os.path.exists(savedir):
        os.makedirs(savedir)

    # With this format
    savename = savedir + '/{}.{}'

    # Find all interesting files on that site:
    soup = BSoup(urllib2.urlopen(the_path).read())

    # Find all links pointing to .dat files
    hrefs = []
    for a in soup.find_all('a', href=True):
        href = a['href']
        # Download datafiles with markers given
        if href[-4::] == '.dat':
            hrefs.append(href[:-4])

    # Path to the file on the internet
    down_path = the_path + '{}.{}'

    for data_id in hrefs:
        for ext in extensions:
            webpath = down_path.format(data_id, ext)
            try:
                datafile = urllib2.urlopen(webpath)

                # Save locally
                filepath = savename.format(data_id, ext)
                with open(filepath, 'wb') as out:
                    out.write(datafile.read())

            # Assuming that 404 (Not Found)
            # is the only one possible http error
            except urllib2.HTTPError:
                print 'Not available:', webpath

    print 'Downloaded {} data files'.format(len(hrefs))


if __name__ == '__main__':
    download_mitdb()

As you can see the line the_path is pretty much hardcoded. You could either tweak your copy to fetch the data from other URLs or pass it in as a argument that defaults to this URL

Upvotes: 0

Chaoste
Chaoste

Reputation: 564

If someone is still wondering: There is a python wrapper for loading the WFDB Software Project for loading waveform data from Physionet (loading form local and remote).

Here is an example how to use it: https://github.com/MIT-LCP/wfdb-python/blob/master/demo.ipynb

To load a sample you can run the following line after installing wfdb:

import wfdb
record = wfdb.rdrecord('16265', pb_dir='nsrdb/')
wfdb.plot_wfdb(record=record, title='Record 16265 from Physionet NSRDB') 
print(record.__dict__)

For loading the whole database use

wfdb.dl_database('nsrdb', 'data/nsrdb')

Upvotes: 1

Related Questions