Tachyon
Tachyon

Reputation: 121

Python delays on Raspberry Pi

I'm trying to simulate a compound action potential for calibrating research instruments. The goal is to output a certain 10 µV signal at 250 Hz. The low voltage will be dealt with later, the main problem for me is the frequency. The picture below shows an overview of the system I'm trying to make.

enter image description here

By data acquisition from a live animal, and processing the data in MATLAB, I've made a low noise signal, with 789 values in 12-bit format. I then cloned the repository where I stored this in csv-format to a Raspberry Pi using Git. Below is the Python script I've written on the RPi. You can skip to def main in the script to see functionality.

#!/usr/bin/python

import spidev
from time import sleep
import RPi.GPIO as GPIO
import csv
import sys
import math

DEBUG = False
spi_max_speed = 20 * 1000000
V_Ref = 5000
Resolution = 2**12
CE = 0

spi = spidev.SpiDev()
spi.open(0,CE)
spi.max_speed_hz = spi_max_speed

LDAQ = 22
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LDAQ, GPIO.OUT)
GPIO.output(LDAQ,GPIO.LOW)

def setOutput(val):
    lowByte = val & 0b11111111 #Make bytes using MCP4921 data sheet info
    highByte = ((val >> 8) & 0xff) | 0b0 << 7 | 0b0 << 6 | 0b1 << 5 | 0b1 << 4
    if DEBUG :
        print("Highbyte = {0:8b}".format(highByte))
        print("Lowbyte =  {0:8b}".format(lowByte))
    spi.xfer2([highByte, lowByte])

def main():
    with open('signal12bit.csv') as signal:
        signal_length = float(raw_input("Please input signal length in ms: "))
        delay = float(raw_input("Please input delay after signal in ms: "))
        amplitude = float(raw_input("Please input signal amplitude in mV: "))
        print "Starting Simulant with signal length %.1f ms, delay %.1f ms and amplitude %.1f mV." % (signal_length, delay, amplitude)
        if not DEBUG : print "Press ctrl+c to close."
        sleep (1) #Wait a sec before starting
        read = csv.reader(signal, delimiter=' ', quotechar='|')
        try:
            while(True):
                signal.seek(0)
                for row in read: #Loop csv file rows
                    if DEBUG : print ', '.join(row)
                    setOutput(int(row)/int((V_Ref/amplitude))) #Adjust amplitude, not super necessary to do in software
                    sleep (signal_length/(data_points*1000) #Divide by 1000 to make into ms, divide by length of data
                sleep (delay/1000)
        except (KeyboardInterrupt, Exception) as e:
            print(e)
            print "Closing SPI channel"
            setOutput(0)
            GPIO.cleanup()
            spi.close()

if __name__ == '__main__':
    main()

This script almost works as intended. Connecting the output pin of an MCP4921 DAC to an oscilloscope shows that it reproduces the signal very well, and it outputs the subsequent delay correctly.

Unfortunately, the data points are seperated much further than I need them to be. The shortest time I can cram the signal into is about 79 ms. This is due to dividing by 789000 in the sleep function, which I know is too much to ask from Python and from the Pi, because reading the csv file takes time. However, if I try making an array manually, and putting those values out instead of reading the csv file, I can achieve a frequency over 6 kHz with no loss.

My question is this

How can I get this signal to appear at a frequency of 250 Hz, and decrease it reliably from the user's input? I've thought about manually writing the 789 values into an array in the script, and then changing the SPI speed to whatever value fits with 250 Hz. This would eliminate the slow csv reader function, but then you can't reduce the frequency from user input. In any case, eliminating the need for csv.read would help a lot. Thanks!

Upvotes: 4

Views: 2885

Answers (1)

Tachyon
Tachyon

Reputation: 121

Figured it out earlier today, so I thought I'd post an answer here, in case someone comes upon a similar problem in the future.

The problem with the internal delay between data points cannot be solved with sleep(), for several reasons. What I ended up doing was the following

  • Move all math and function calling out of the critical loop
  • Do a linear regression analysis on the time it takes to transfer the values with no delay
  • Increase the number of datapoints in the CSV file to "plenty" (9600) in MATLAB
  • Calculate the number of points needed to meet the user's wanted signal length
  • Take evenly seperated entries from the now bigger CSV file to fit that number of points as closely as possible.
  • Calculate these values and then calculate the SPI bytes explicitly
  • Save the two byte lists, and output them directly in the critical loop

The new code, with a bit of input checking, is below

#!/usr/bin/python

import spidev
from time import sleep
import RPi.GPIO as GPIO
import sys
import csv
import ast

spi_max_speed = 16 * 1000000 # 16 MHz
V_Ref = 5000 # 5V in mV
Resolution = 2**12 # 12 bits for the MCP 4921
CE = 0 # CE0 or CE1, select SPI device on bus
total_data_points = 9600 #CSV file length

spi = spidev.SpiDev()
spi.open(0,CE)
spi.max_speed_hz = spi_max_speed

LDAQ=22
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LDAQ, GPIO.OUT)
GPIO.output(LDAQ,GPIO.LOW)

def main():

    #User inputs and checking for digits
    signalLengthU = raw_input("Input signal length in ms, minimum 4: ")
    if signalLengthU.isdigit():
        signalLength = signalLengthU
    else:
        signalLength = 4

    delayU = raw_input("Input delay after signal in ms: ")
    if delayU.isdigit():
        delay = delayU
    else:
        delay = 0

    amplitudeU = raw_input("Input signal amplitude in mV, between 1 and 5000: ")
    if amplitudeU.isdigit():
        amplitude = amplitudeU
    else:
        amplitude = 5000

    #Calculate data points, delay, and amplitude
    data_points = int((1000*float(signalLength)-24.6418)/12.3291)
    signalDelay = float(delay)/1000
    setAmplitude = V_Ref/float(amplitude)

    #Load and save CSV file
    datain = open('signal12bit.csv')
    read = csv.reader(datain, delimiter=' ', quotechar='|')
    signal = []
    for row in read:
        signal.append(ast.literal_eval(row[0]))

    #Downsampling to achieve desired signal length
    downsampling = int(round(total_data_points/data_points))
    signalSpeed = signal[0::downsampling]
    listlen = len(signalSpeed)

    #Construction of SPI bytes, to avoid calling functions in critical loop
    lowByte = []
    highByte = []
    for i in signalSpeed:
        lowByte.append(int(i/setAmplitude) & 0b11111111)
        highByte.append(((int(i/setAmplitude) >> 8) & 0xff) | 0b0 << 7 | 0b0 << 6 | 0b1 << 5 | 0b1 << 4)

    print "Starting Simulant with signal length %s ms, delay %s ms and amplitude %s mV." % (signalLength, delay, amplitude)
    print "Press ctrl+c to stop."
    sleep (1)

    try:
        while(True): #Main loop
            for i in range(listlen):
                spi.xfer2([highByte[i],lowByte[i]]) #Critical loop, no delay!
            sleep (signalDelay)
    except (KeyboardInterrupt, Exception) as e:
        print e
        print "Closing SPI channel"
        lowByte = 0 & 0b11111111
        highByte = ((0 >> 8) & 0xff) | 0b0 << 7 | 0b0 << 6 | 0b1 << 5 | 0b1 << 4
        spi.xfer2([highByte, lowByte])
        GPIO.cleanup()
        spi.close()

if __name__ == '__main__':
    main()

The result is exactly what I wanted. Below is seen an example from the oscilloscope with a signal length of 5 ms; 200 Hz. Thanks for your help, guys!

Oscilloscope reading

Upvotes: 1

Related Questions