Patratacus
Patratacus

Reputation: 1791

RP2040 Pico Micropython DMA keeps hanging up after 500 bytes of data write to a WAV file on an SD Card

I have an RP2040 Pico connected to an electret mic connected to an 12-bit ADC pin on the Pico. There's also an SD Card reader connected to the SPI0 on the Pico. I use a momentary switch to trigger the recording and writing a WAV file to the SD card. The Pico is running the latest Micropython Firmware (RPI_PICO-20240602-v1.23.0.uf2). I used the DMA to grab many samples from the ADC into a buffer array and manipulated the array data to be 16-bit and recenter the 0 in the middle of the waveform before writing to a WAV file on a 64GB SD Card (FAT32 format). (The 64GB was formatted using a special FAT32 formatter program for Windows).

I'm running up against a brick wall here trying to figure out why I can't write more than 500 bytes to a WAV file before the program hangs up. The DMA seems to stop triggering after it reaches the 500 bytes write to the WAV file. If I remove the num_bytes_written = wav.write(buffer) and replace it with the # of bytes to be written to bypass the wav file write then the DMA loop runs all the way to the end. So the problem seems to be linked to the wav.write but I don't know why it seems to stuck around 500 bytes.

The SD Card writing works fine on its own if I generate a sine wave data to write to it. The DMA also works fine on its own if I just display the data and not try to write to the WAV file. However, together it doesn't seem to work. I don't think it's a speed issue since it didn't seem to have trouble writing 50-byte chunks multiple times until it reaches 500 bytes. Although the WAV file didn't close properly, it can still be opened and there are partial audio data that it managed to write.

Recorded WAV data

import machine
import time
import utime
import sdcard
import uos
import math
import random
import time, array, uctypes, rp_devices as devs

def enum(**enums: int):
    return type('Enum', (), enums)

Channel = enum(MONO=1, STEREO=2)

AUDIO_BUFFER_SIZE = 50  #number of samples

# ======= AUDIO CONFIGURATION =======
WAV_FILE = "test.wav"
RECORD_TIME_IN_SECONDS = 1
WAV_SAMPLE_SIZE_IN_BITS = 8
FORMAT = Channel.MONO
SAMPLE_RATE_IN_HZ = 1000

SD_SPI_PORT = 0
SD_SPI_SCK_PIN = 18
SD_SPI_MOSI_PIN = 19
SD_SPI_MISO_PIN = 16
SD_SPI_CS_PIN = 17
SD_SPI_BAUD_RATE = 40000000

    
def create_wav_header(sampleRate, bitsPerSample, num_channels, num_samples):
    datasize = num_samples * num_channels * bitsPerSample // 8
    o = bytes("RIFF", "ascii")  # (4byte) Marks file as RIFF
    o += (datasize + 36).to_bytes(
        4, "little"
    )  # (4byte) File size in bytes excluding this and RIFF marker
    o += bytes("WAVE", "ascii")  # (4byte) File type
    o += bytes("fmt ", "ascii")  # (4byte) Format Chunk Marker
    o += (16).to_bytes(4, "little")  # (4byte) Length of above format data
    o += (1).to_bytes(2, "little")  # (2byte) Format type (1 - PCM)
    o += (num_channels).to_bytes(2, "little")  # (2byte)
    o += (sampleRate).to_bytes(4, "little")  # (4byte)
    o += (sampleRate * num_channels * bitsPerSample // 8).to_bytes(4, "little")  # (4byte)
    o += (num_channels * bitsPerSample // 8).to_bytes(2, "little")  # (2byte)
    o += (bitsPerSample).to_bytes(2, "little")  # (2byte)
    o += bytes("data", "ascii")  # (4byte) Data Chunk Marker
    o += (datasize).to_bytes(4, "little")  # (4byte) Data size in bytes
    return o

def process_buff(adc_buffer):
    count = 0
    for sample in adc_buffer:
        # convert 12-bit to 16-bit by multiplying 2^4 and then subtract half of 16-bit to center the signal at 0
        buffer[count] = round((sample + 1) * 2**4) - 32768
        count += 1

#======================== LED and Push Button =======================
#Pico default LED
green_led = machine.Pin(25, machine.Pin.OUT)

red_led = machine.Pin(11, machine.Pin.OUT)
red_btn = machine.Pin(10, machine.Pin.IN, machine.Pin.PULL_UP)

red_last = time.ticks_ms()
red_state = False

def button_handler(pin):
    global red_last, red_btn, red_state
      
    if pin is red_btn:
        #debouncing by checking if last interrupt was more than 150 ms
        if time.ticks_diff(time.ticks_ms(), red_last) > 500:
            #red_led.toggle()
            red_state = True
            red_last = time.ticks_ms()

red_led.value(0)  #turn LED off
red_btn.irq(trigger = machine.Pin.IRQ_FALLING, handler = button_handler)

# ===================================================================

#============================ ADC & DMA Setup ============================
ADC_CHAN = 0
ADC_PIN  = 26 + ADC_CHAN

adc = devs.ADC_DEVICE
pin = devs.GPIO_PINS[ADC_PIN]
pad = devs.PAD_PINS[ADC_PIN]
pin.GPIO_CTRL_REG = devs.GPIO_FUNC_NULL
pad.PAD_REG = 0

adc.CS_REG = adc.FCS_REG = 0
adc.CS.EN = 1
adc.CS.AINSEL = ADC_CHAN

DMA_CH0 = 0
DMA_CH1 = 1

NSAMPLES = AUDIO_BUFFER_SIZE   #24000 max
RATE = SAMPLE_RATE_IN_HZ
dma_chan0 = devs.DMA_CHANS[DMA_CH0]
dma0 = devs.DMA_DEVICE

dma_chan1 = devs.DMA_CHANS[DMA_CH1]
dma1 = devs.DMA_DEVICE

adc.FCS.EN = adc.FCS.DREQ_EN = 1
adc_buff0 = array.array('H', (0 for _ in range(NSAMPLES)))
# adc_buff1 = array.array('H', (0 for _ in range(NSAMPLES)))

#adc_buff0 = bytearray(NSAMPLES * WAV_SAMPLE_SIZE_IN_BITS // 8)
adc_buff0_mv = memoryview(adc_buff0)
buffer = array.array('H', (0 for _ in range(NSAMPLES)))

# ===========================================================
# Assign chip select (CS) pin (and start it high)
cs = machine.Pin(17, machine.Pin.OUT)

# Initialize the SD card
spi=machine.SPI(0,
        baudrate=40000000,
        polarity=0,
        phase=0,
        bits=8,
        firstbit=machine.SPI.MSB,
        sck=machine.Pin(18),
        mosi=machine.Pin(19),
        miso=machine.Pin(16))

sd=sdcard.SDCard(spi, cs)

# Mount filesystem, FAT32
vfs = uos.VfsFat(sd)


while True:
    if red_state:
        red_state = False
        for i in range(2):
            red_led.value(1)
            time.sleep_ms(250)
            red_led.value(0)
            time.sleep_ms(250)
        red_led.value(1)
        
        uos.mount(vfs, "/sd")

        format_to_channels = {Channel.MONO: 1, Channel.STEREO: 2}
        NUM_CHANNELS = format_to_channels[FORMAT]
        WAV_SAMPLE_SIZE_IN_BYTES = WAV_SAMPLE_SIZE_IN_BITS // 8
        RECORDING_SIZE_IN_BYTES = (
            RECORD_TIME_IN_SECONDS * SAMPLE_RATE_IN_HZ * WAV_SAMPLE_SIZE_IN_BYTES * NUM_CHANNELS
        )
        
        print("Creating WAV File Header ...")        
        wav = open("/sd/{}".format(WAV_FILE), "wb")  #write in bytes
        wav_header = create_wav_header(
            SAMPLE_RATE_IN_HZ,
            WAV_SAMPLE_SIZE_IN_BITS,
            NUM_CHANNELS,
            SAMPLE_RATE_IN_HZ * RECORD_TIME_IN_SECONDS,
        )
        num_bytes_written = wav.write(wav_header)

        num_sample_bytes_written_to_wav = 0

        print("Sample Rate (kHz): " + str(SAMPLE_RATE_IN_HZ//1000) + ", Total Time (s): " + str(RECORD_TIME_IN_SECONDS))
        print("Recording size: {} Bytes".format(RECORDING_SIZE_IN_BYTES))
        print("==========  START RECORDING ==========")
        green_led.value(1) 
        try:
            
            #=================== Configure ADC and DMA Triggers ============
            #Section 4.9 RP2040 datasheet - set integer part of the clock and
            #ignore the fractional part ( <<8  is used for that)
            adc.DIV_REG = (48000000 // RATE - 1) << 8
            adc.FCS.THRESH = adc.FCS.OVER = adc.FCS.UNDER = 1

            dma_chan0.READ_ADDR_REG = devs.ADC_FIFO_ADDR
            dma_chan0.WRITE_ADDR_REG = uctypes.addressof(adc_buff0)
            dma_chan0.TRANS_COUNT_REG = NSAMPLES * WAV_SAMPLE_SIZE_IN_BITS // 8

            dma_chan0.CTRL_TRIG_REG = 0
            dma_chan0.CTRL_TRIG.CHAIN_TO = DMA_CH0
            dma_chan0.CTRL_TRIG.INCR_WRITE = dma_chan0.CTRL_TRIG.IRQ_QUIET = 1
            dma_chan0.CTRL_TRIG.TREQ_SEL = devs.DREQ_ADC
            #Data size. 0=byte, 1=halfword, 2=word
            dma_chan0.CTRL_TRIG.DATA_SIZE = 1   
            dma_chan0.CTRL_TRIG.EN = 1


            
            try:
                    
                #Clear down ADC FIFO so no data mixing
                while adc.FCS.LEVEL:
                    x = adc.FIFO_REG

                adc.CS.START_MANY = 1

                while num_sample_bytes_written_to_wav < RECORDING_SIZE_IN_BYTES:
                    
                    #print("Byte written: " + str(num_sample_bytes_written_to_wav) + "/" + str(RECORDING_SIZE_IN_BYTES))

                    
                    if(not dma_chan0.CTRL_TRIG.BUSY):
                        adc.CS.START_MANY = 0
                        dma_chan0.CTRL_TRIG.EN = 0
                        # Each sample is a 16-bit value or 2 bytes
                        num_bytes_read_from_mic = NSAMPLES * WAV_SAMPLE_SIZE_IN_BITS // 8 

                        num_bytes_to_write = num_bytes_read_from_mic
                        process_buff(adc_buff0)
                    
                        # write samples to WAV file
                        num_bytes_written = wav.write(buffer)
                        
                        print("Writing from buff0: " + str(num_bytes_to_write) + " bytes")
                        
                        num_sample_bytes_written_to_wav += num_bytes_written
                        
                        print("Total Byte written: " + str(num_sample_bytes_written_to_wav) + "/" + str(RECORDING_SIZE_IN_BYTES))
                        
                        
                        dma_chan0.READ_ADDR_REG = devs.ADC_FIFO_ADDR
                        dma_chan0.WRITE_ADDR_REG = uctypes.addressof(adc_buff0_mv)
                        
                        dma_chan0.CTRL_TRIG.EN = 1
                        adc.CS.START_MANY = 1                       
                    

            except (KeyboardInterrupt, Exception) as e:
                print("Exception {} {}".format(type(e).__name__, e))
                               
            adc.CS.START_MANY = 0
            dma_chan0.CTRL_TRIG.EN = 0
            
            print("==========  DONE RECORDING ==========")
        except (KeyboardInterrupt, Exception) as e:
            print("caught exception {} {}".format(type(e).__name__, e))

               
        green_led.value(0)
        red_led.value(0)
        # cleanup
        wav.close()
        uos.umount("/sd")

        print("Done")
            

Need the 2 supporting python files on the RP2040 Pico to run this.

rp_devices.py (from Jeremy P Bentham's blog) =====

from uctypes import BF_POS, BF_LEN, UINT32, BFUINT32, struct

GPIO_BASE       = 0x40014000
GPIO_CHAN_WIDTH = 0x08
GPIO_PIN_COUNT  = 30
PAD_BASE        = 0x4001c000
PAD_PIN_WIDTH   = 0x04
ADC_BASE        = 0x4004c000
DMA_BASE        = 0x50000000
DMA_CHAN_WIDTH  = 0x40
DMA_CHAN_COUNT  = 12

# DMA: RP2040 datasheet 2.5.7
DMA_CTRL_TRIG_FIELDS = {
    "AHB_ERROR":   31<<BF_POS | 1<<BF_LEN | BFUINT32,
    "READ_ERROR":  30<<BF_POS | 1<<BF_LEN | BFUINT32,
    "WRITE_ERROR": 29<<BF_POS | 1<<BF_LEN | BFUINT32,
    "BUSY":        24<<BF_POS | 1<<BF_LEN | BFUINT32,
    "SNIFF_EN":    23<<BF_POS | 1<<BF_LEN | BFUINT32,
    "BSWAP":       22<<BF_POS | 1<<BF_LEN | BFUINT32,
    "IRQ_QUIET":   21<<BF_POS | 1<<BF_LEN | BFUINT32,
    "TREQ_SEL":    15<<BF_POS | 6<<BF_LEN | BFUINT32,
    "CHAIN_TO":    11<<BF_POS | 4<<BF_LEN | BFUINT32,
    "RING_SEL":    10<<BF_POS | 1<<BF_LEN | BFUINT32,
    "RING_SIZE":    6<<BF_POS | 4<<BF_LEN | BFUINT32,
    "INCR_WRITE":   5<<BF_POS | 1<<BF_LEN | BFUINT32,
    "INCR_READ":    4<<BF_POS | 1<<BF_LEN | BFUINT32,
    "DATA_SIZE":    2<<BF_POS | 2<<BF_LEN | BFUINT32,
    "HIGH_PRIORITY":1<<BF_POS | 1<<BF_LEN | BFUINT32,
    "EN":           0<<BF_POS | 1<<BF_LEN | BFUINT32
}
# Channel-specific DMA registers
DMA_CHAN_REGS = {
    "READ_ADDR_REG":       0x00|UINT32,
    "WRITE_ADDR_REG":      0x04|UINT32,
    "TRANS_COUNT_REG":     0x08|UINT32,
    "CTRL_TRIG_REG":       0x0c|UINT32,
    "CTRL_TRIG":          (0x0c,DMA_CTRL_TRIG_FIELDS)
}
# General DMA registers
DMA_REGS = {
    "INTR":               0x400|UINT32,
    "INTE0":              0x404|UINT32,
    "INTF0":              0x408|UINT32,
    "INTS0":              0x40c|UINT32,
    "INTE1":              0x414|UINT32,
    "INTF1":              0x418|UINT32,
    "INTS1":              0x41c|UINT32,
    "TIMER0":             0x420|UINT32,
    "TIMER1":             0x424|UINT32,
    "TIMER2":             0x428|UINT32,
    "TIMER3":             0x42c|UINT32,
    "MULTI_CHAN_TRIGGER": 0x430|UINT32,
    "SNIFF_CTRL":         0x434|UINT32,
    "SNIFF_DATA":         0x438|UINT32,
    "FIFO_LEVELS":        0x440|UINT32,
    "CHAN_ABORT":         0x444|UINT32
}

# GPIO status and control: RP2040 datasheet 2.19.6.1.10
GPIO_STATUS_FIELDS = {
    "IRQTOPROC":  26<<BF_POS | 1<<BF_LEN | BFUINT32,
    "IRQFROMPAD": 24<<BF_POS | 1<<BF_LEN | BFUINT32,
    "INTOPERI":   19<<BF_POS | 1<<BF_LEN | BFUINT32,
    "INFROMPAD":  17<<BF_POS | 1<<BF_LEN | BFUINT32,
    "OETOPAD":    13<<BF_POS | 1<<BF_LEN | BFUINT32,
    "OEFROMPERI": 12<<BF_POS | 1<<BF_LEN | BFUINT32,
    "OUTTOPAD":    9<<BF_POS | 1<<BF_LEN | BFUINT32,
    "OUTFROMPERI": 8<<BF_POS | 1<<BF_LEN | BFUINT32
}
GPIO_CTRL_FIELDS = {
    "IRQOVER":    28<<BF_POS | 2<<BF_LEN | BFUINT32,
    "INOVER":     16<<BF_POS | 2<<BF_LEN | BFUINT32,
    "OEOVER":     12<<BF_POS | 2<<BF_LEN | BFUINT32,
    "OUTOVER":     8<<BF_POS | 2<<BF_LEN | BFUINT32,
    "FUNCSEL":     0<<BF_POS | 5<<BF_LEN | BFUINT32
}
GPIO_REGS = {
    "GPIO_STATUS_REG":     0x00|UINT32,
    "GPIO_STATUS":        (0x00,GPIO_STATUS_FIELDS),
    "GPIO_CTRL_REG":       0x04|UINT32,
    "GPIO_CTRL":          (0x04,GPIO_CTRL_FIELDS)
}

# PAD control: RP2040 datasheet 2.19.6.3
PAD_FIELDS = {
    "OD":          7<<BF_POS | 1<<BF_LEN | BFUINT32,
    "IE":          6<<BF_POS | 1<<BF_LEN | BFUINT32,
    "DRIVE":       4<<BF_POS | 2<<BF_LEN | BFUINT32,
    "PUE":         3<<BF_POS | 1<<BF_LEN | BFUINT32,
    "PDE":         2<<BF_POS | 1<<BF_LEN | BFUINT32,
    "SCHMITT":     1<<BF_POS | 1<<BF_LEN | BFUINT32,
    "SLEWFAST":    0<<BF_POS | 1<<BF_LEN | BFUINT32
}
PAD_REGS = {
    "PAD_REG":             0x00|UINT32,
    "PAD":                (0x00,PAD_FIELDS)
}

# ADC: RP2040 datasheet 4.9.6
ADC_CS_FIELDS = {
    "RROBIN":     16<<BF_POS | 5<<BF_LEN | BFUINT32,
    "AINSEL":     12<<BF_POS | 3<<BF_LEN | BFUINT32,
    "ERR_STICKY": 10<<BF_POS | 1<<BF_LEN | BFUINT32,
    "ERR":         9<<BF_POS | 1<<BF_LEN | BFUINT32,
    "READY":       8<<BF_POS | 1<<BF_LEN | BFUINT32,
    "START_MANY":  3<<BF_POS | 1<<BF_LEN | BFUINT32,
    "START_ONCE":  2<<BF_POS | 1<<BF_LEN | BFUINT32,
    "TS_EN":       1<<BF_POS | 1<<BF_LEN | BFUINT32,
    "EN":          0<<BF_POS | 1<<BF_LEN | BFUINT32
}
ADC_FCS_FIELDS = {
    "THRESH":     24<<BF_POS | 4<<BF_LEN | BFUINT32,
    "LEVEL":      16<<BF_POS | 4<<BF_LEN | BFUINT32,
    "OVER":       11<<BF_POS | 1<<BF_LEN | BFUINT32,
    "UNDER":      10<<BF_POS | 1<<BF_LEN | BFUINT32,
    "FULL":        9<<BF_POS | 1<<BF_LEN | BFUINT32,
    "EMPTY":       8<<BF_POS | 1<<BF_LEN | BFUINT32,
    "DREQ_EN":     3<<BF_POS | 1<<BF_LEN | BFUINT32,
    "ERR":         2<<BF_POS | 1<<BF_LEN | BFUINT32,
    "SHIFT":       1<<BF_POS | 1<<BF_LEN | BFUINT32,
    "EN":          0<<BF_POS | 1<<BF_LEN | BFUINT32,
}
ADC_REGS = {
    "CS_REG":              0x00|UINT32,
    "CS":                 (0x00,ADC_CS_FIELDS),
    "RESULT_REG":          0x04|UINT32,
    "FCS_REG":             0x08|UINT32,
    "FCS":                (0x08,ADC_FCS_FIELDS),
    "FIFO_REG":            0x0c|UINT32,
    "DIV_REG":             0x10|UINT32,
    "INTR_REG":            0x14|UINT32,
    "INTE_REG":            0x18|UINT32,
    "INTF_REG":            0x1c|UINT32,
    "INTS_REG":            0x20|UINT32
}
DREQ_PIO0_TX0, DREQ_PIO0_RX0, DREQ_PIO1_TX0 = 0, 4, 8
DREQ_PIO1_RX0, DREQ_SPI0_TX,  DREQ_SPI0_RX  = 12, 16, 17
DREQ_SPI1_TX,  DREQ_SPI1_RX,  DREQ_UART0_TX = 18, 19, 20
DREQ_UART0_RX, DREQ_UART1_TX, DREQ_UART1_RX = 21, 22, 23
DREQ_I2C0_TX,  DREQ_I2C0_RX,  DREQ_I2C1_TX  = 32, 33, 34
DREQ_I2C1_RX,  DREQ_ADC                     = 35, 36

DMA_CHANS = [struct(DMA_BASE + n*DMA_CHAN_WIDTH, DMA_CHAN_REGS) for n in range(0,DMA_CHAN_COUNT)]
DMA_DEVICE = struct(DMA_BASE, DMA_REGS)
GPIO_PINS = [struct(GPIO_BASE + n*GPIO_CHAN_WIDTH, GPIO_REGS) for n in range(0,GPIO_PIN_COUNT)]
PAD_PINS =  [struct(PAD_BASE + n*PAD_PIN_WIDTH, PAD_REGS) for n in range(0,GPIO_PIN_COUNT)]
ADC_DEVICE = struct(ADC_BASE, ADC_REGS)
ADC_FIFO_ADDR = ADC_BASE + 0x0c

GPIO_FUNC_SPI, GPIO_FUNC_UART, GPIO_FUNC_I2C = 1, 2, 3
GPIO_FUNC_PWM, GPIO_FUNC_SIO, GPIO_FUNC_PIO0 = 4, 5, 6
GPIO_FUNC_NULL = 0x1f

# EOF

sdcard.py

from micropython import const
import time


_CMD_TIMEOUT = const(100)

_R1_IDLE_STATE = const(1 << 0)
# R1_ERASE_RESET = const(1 << 1)
_R1_ILLEGAL_COMMAND = const(1 << 2)
# R1_COM_CRC_ERROR = const(1 << 3)
# R1_ERASE_SEQUENCE_ERROR = const(1 << 4)
# R1_ADDRESS_ERROR = const(1 << 5)
# R1_PARAMETER_ERROR = const(1 << 6)
_TOKEN_CMD25 = const(0xFC)
_TOKEN_STOP_TRAN = const(0xFD)
_TOKEN_DATA = const(0xFE)


class SDCard:
    def __init__(self, spi, cs, baudrate=1320000):
        self.spi = spi
        self.cs = cs

        self.cmdbuf = bytearray(6)
        self.dummybuf = bytearray(512)
        self.tokenbuf = bytearray(1)
        for i in range(512):
            self.dummybuf[i] = 0xFF
        self.dummybuf_memoryview = memoryview(self.dummybuf)

        # initialise the card
        self.init_card(baudrate)

    def init_spi(self, baudrate):
        try:
            master = self.spi.MASTER
        except AttributeError:
            # on ESP8266
            self.spi.init(baudrate=baudrate, phase=0, polarity=0)
        else:
            # on pyboard
            self.spi.init(master, baudrate=baudrate, phase=0, polarity=0)

    def init_card(self, baudrate):
        # init CS pin
        self.cs.init(self.cs.OUT, value=1)

        # init SPI bus; use low data rate for initialisation
        self.init_spi(100000)

        # clock card at least 100 cycles with cs high
        for i in range(16):
            self.spi.write(b"\xff")

        # CMD0: init card; should return _R1_IDLE_STATE (allow 5 attempts)
        for _ in range(5):
            if self.cmd(0, 0, 0x95) == _R1_IDLE_STATE:
                break
        else:
            raise OSError("no SD card")

        # CMD8: determine card version
        r = self.cmd(8, 0x01AA, 0x87, 4)
        if r == _R1_IDLE_STATE:
            self.init_card_v2()
        elif r == (_R1_IDLE_STATE | _R1_ILLEGAL_COMMAND):
            self.init_card_v1()
        else:
            raise OSError("couldn't determine SD card version")

        # get the number of sectors
        # CMD9: response R2 (R1 byte + 16-byte block read)
        if self.cmd(9, 0, 0, 0, False) != 0:
            raise OSError("no response from SD card")
        csd = bytearray(16)
        self.readinto(csd)
        if csd[0] & 0xC0 == 0x40:  # CSD version 2.0
            self.sectors = ((csd[8] << 8 | csd[9]) + 1) * 1024
        elif csd[0] & 0xC0 == 0x00:  # CSD version 1.0 (old, <=2GB)
            c_size = (csd[6] & 0b11) << 10 | csd[7] << 2 | csd[8] >> 6
            c_size_mult = (csd[9] & 0b11) << 1 | csd[10] >> 7
            read_bl_len = csd[5] & 0b1111
            capacity = (c_size + 1) * (2 ** (c_size_mult + 2)) * (2**read_bl_len)
            self.sectors = capacity // 512
        else:
            raise OSError("SD card CSD format not supported")
        # print('sectors', self.sectors)

        # CMD16: set block length to 512 bytes
        if self.cmd(16, 512, 0) != 0:
            raise OSError("can't set 512 block size")

        # set to high data rate now that it's initialised
        self.init_spi(baudrate)

    def init_card_v1(self):
        for i in range(_CMD_TIMEOUT):
            time.sleep_ms(50)
            self.cmd(55, 0, 0)
            if self.cmd(41, 0, 0) == 0:
                # SDSC card, uses byte addressing in read/write/erase commands
                self.cdv = 512
                # print("[SDCard] v1 card")
                return
        raise OSError("timeout waiting for v1 card")

    def init_card_v2(self):
        for i in range(_CMD_TIMEOUT):
            time.sleep_ms(50)
            self.cmd(58, 0, 0, 4)
            self.cmd(55, 0, 0)
            if self.cmd(41, 0x40000000, 0) == 0:
                self.cmd(58, 0, 0, -4)  # 4-byte response, negative means keep the first byte
                ocr = self.tokenbuf[0]  # get first byte of response, which is OCR
                if not ocr & 0x40:
                    # SDSC card, uses byte addressing in read/write/erase commands
                    self.cdv = 512
                else:
                    # SDHC/SDXC card, uses block addressing in read/write/erase commands
                    self.cdv = 1
                # print("[SDCard] v2 card")
                return
        raise OSError("timeout waiting for v2 card")

    def cmd(self, cmd, arg, crc, final=0, release=True, skip1=False):
        self.cs(0)

        # create and send the command
        buf = self.cmdbuf
        buf[0] = 0x40 | cmd
        buf[1] = arg >> 24
        buf[2] = arg >> 16
        buf[3] = arg >> 8
        buf[4] = arg
        buf[5] = crc
        self.spi.write(buf)

        if skip1:
            self.spi.readinto(self.tokenbuf, 0xFF)

        # wait for the response (response[7] == 0)
        for i in range(_CMD_TIMEOUT):
            self.spi.readinto(self.tokenbuf, 0xFF)
            response = self.tokenbuf[0]
            if not (response & 0x80):
                # this could be a big-endian integer that we are getting here
                # if final<0 then store the first byte to tokenbuf and discard the rest
                if final < 0:
                    self.spi.readinto(self.tokenbuf, 0xFF)
                    final = -1 - final
                for j in range(final):
                    self.spi.write(b"\xff")
                if release:
                    self.cs(1)
                    self.spi.write(b"\xff")
                return response

        # timeout
        self.cs(1)
        self.spi.write(b"\xff")
        return -1

    def readinto(self, buf):
        self.cs(0)

        # read until start byte (0xff)
        for i in range(_CMD_TIMEOUT):
            self.spi.readinto(self.tokenbuf, 0xFF)
            if self.tokenbuf[0] == _TOKEN_DATA:
                break
            time.sleep_ms(1)
        else:
            self.cs(1)
            raise OSError("timeout waiting for response")

        # read data
        mv = self.dummybuf_memoryview
        if len(buf) != len(mv):
            mv = mv[: len(buf)]
        self.spi.write_readinto(mv, buf)

        # read checksum
        self.spi.write(b"\xff")
        self.spi.write(b"\xff")

        self.cs(1)
        self.spi.write(b"\xff")

    def write(self, token, buf):
        self.cs(0)

        # send: start of block, data, checksum
        self.spi.read(1, token)
        self.spi.write(buf)
        self.spi.write(b"\xff")
        self.spi.write(b"\xff")

        # check the response
        if (self.spi.read(1, 0xFF)[0] & 0x1F) != 0x05:
            self.cs(1)
            self.spi.write(b"\xff")
            return

        # wait for write to finish
        while self.spi.read(1, 0xFF)[0] == 0:
            pass

        self.cs(1)
        self.spi.write(b"\xff")

    def write_token(self, token):
        self.cs(0)
        self.spi.read(1, token)
        self.spi.write(b"\xff")
        # wait for write to finish
        while self.spi.read(1, 0xFF)[0] == 0x00:
            pass

        self.cs(1)
        self.spi.write(b"\xff")

    def readblocks(self, block_num, buf):
        # workaround for shared bus, required for (at least) some Kingston
        # devices, ensure MOSI is high before starting transaction
        self.spi.write(b"\xff")

        nblocks = len(buf) // 512
        assert nblocks and not len(buf) % 512, "Buffer length is invalid"
        if nblocks == 1:
            # CMD17: set read address for single block
            if self.cmd(17, block_num * self.cdv, 0, release=False) != 0:
                # release the card
                self.cs(1)
                raise OSError(5)  # EIO
            # receive the data and release card
            self.readinto(buf)
        else:
            # CMD18: set read address for multiple blocks
            if self.cmd(18, block_num * self.cdv, 0, release=False) != 0:
                # release the card
                self.cs(1)
                raise OSError(5)  # EIO
            offset = 0
            mv = memoryview(buf)
            while nblocks:
                # receive the data and release card
                self.readinto(mv[offset : offset + 512])
                offset += 512
                nblocks -= 1
            if self.cmd(12, 0, 0xFF, skip1=True):
                raise OSError(5)  # EIO

    def writeblocks(self, block_num, buf):
        # workaround for shared bus, required for (at least) some Kingston
        # devices, ensure MOSI is high before starting transaction
        self.spi.write(b"\xff")

        nblocks, err = divmod(len(buf), 512)
        assert nblocks and not err, "Buffer length is invalid"
        if nblocks == 1:
            # CMD24: set write address for single block
            if self.cmd(24, block_num * self.cdv, 0) != 0:
                raise OSError(5)  # EIO

            # send the data
            self.write(_TOKEN_DATA, buf)
        else:
            # CMD25: set write address for first block
            if self.cmd(25, block_num * self.cdv, 0) != 0:
                raise OSError(5)  # EIO
            # send the data
            offset = 0
            mv = memoryview(buf)
            while nblocks:
                self.write(_TOKEN_CMD25, mv[offset : offset + 512])
                offset += 512
                nblocks -= 1
            self.write_token(_TOKEN_STOP_TRAN)

    def ioctl(self, op, arg):
        if op == 4:  # get number of blocks
            return self.sectors
        if op == 5:  # get block size in bytes
            return 512

Upvotes: 0

Views: 158

Answers (0)

Related Questions