Reputation: 1
I'm trying to run a python code for an awr1642BOOST board to make a short range radar. since i dont have an dca1000 evm data capture board I'm trying to obtain the detected data points using the detObj dictionary of the provided python code. On running the the code the 2D scatter window appears blank and remains unresponsive. Please can someone point out what i might be doing wrong. I have also attached a screen shot of the error window and the python shell.[enter image description here][1]
import time
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtGui
# Change the configuration file name
configFileName = '1642config.cfg'
CLIport = {}
Dataport = {}
byteBuffer = np.zeros(2**15,dtype = 'uint8')
byteBufferLength = 0;
# ------------------------------------------------------------------
# Function to configure the serial ports and send the data from
# the configuration file to the radar
def serialConfig(configFileName):
global CLIport
global Dataport
# Open the serial ports for the configuration and the data ports
# Raspberry pi
#CLIport = serial.Serial('/dev/ttyACM0', 115200)
#Dataport = serial.Serial('/dev/ttyACM1', 921600)
# Windows
CLIport = serial.Serial('COM4', 115200)
Dataport = serial.Serial('COM5', 921600)
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
CLIport.write((i+'\n').encode())
print(i)
time.sleep(0.01)
return CLIport, Dataport
# ------------------------------------------------------------------
# Function to parse the data inside the configuration file
def parseConfigFile(configFileName):
configParameters = {} # Initialize an empty dictionary to store the configuration parameters
# Read the configuration file and send it to the board
config = [line.rstrip('\r\n') for line in open(configFileName)]
for i in config:
# Split the line
splitWords = i.split(" ")
# Hard code the number of antennas, change if other configuration is used
numRxAnt = 4
numTxAnt = 2
# Get the information about the profile configuration
if "profileCfg" in splitWords[0]:
startFreq = int(float(splitWords[2]))
idleTime = int(splitWords[3])
rampEndTime = float(splitWords[5])
freqSlopeConst = float(splitWords[8])
numAdcSamples = int(splitWords[10])
numAdcSamplesRoundTo2 = 1;
while numAdcSamples > numAdcSamplesRoundTo2:
numAdcSamplesRoundTo2 = numAdcSamplesRoundTo2 * 2;
digOutSampleRate = int(splitWords[11]);
# Get the information about the frame configuration
elif "frameCfg" in splitWords[0]:
chirpStartIdx = int(splitWords[1]);
chirpEndIdx = int(splitWords[2]);
numLoops = int(splitWords[3]);
numFrames = int(splitWords[4]);
framePeriodicity = int(splitWords[5]);
# Combine the read data to obtain the configuration parameters
numChirpsPerFrame = (chirpEndIdx - chirpStartIdx + 1) * numLoops
configParameters["numDopplerBins"] = numChirpsPerFrame / numTxAnt
configParameters["numRangeBins"] = numAdcSamplesRoundTo2
configParameters["rangeResolutionMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * numAdcSamples)
configParameters["rangeIdxToMeters"] = (3e8 * digOutSampleRate * 1e3) / (2 * freqSlopeConst * 1e12 * configParameters["numRangeBins"])
configParameters["dopplerResolutionMps"] = 3e8 / (2 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * configParameters["numDopplerBins"] * numTxAnt)
configParameters["maxRange"] = (300 * 0.9 * digOutSampleRate)/(2 * freqSlopeConst * 1e3)
configParameters["maxVelocity"] = 3e8 / (4 * startFreq * 1e9 * (idleTime + rampEndTime) * 1e-6 * numTxAnt)
return configParameters
# ------------------------------------------------------------------
# Funtion to read and parse the incoming data
def readAndParseData16xx(Dataport, configParameters):
global byteBuffer, byteBufferLength
# Constants
OBJ_STRUCT_SIZE_BYTES = 12;
BYTE_VEC_ACC_MAX_SIZE = 2**15;
MMWDEMO_UART_MSG_DETECTED_POINTS = 1;
MMWDEMO_UART_MSG_RANGE_PROFILE = 2;
maxBufferSize = 2**15;
magicWord = [2, 1, 4, 3, 6, 5, 8, 7]
# Initialize variables
magicOK = 0 # Checks if magic number has been read
dataOK = 0 # Checks if the data has been read correctly
frameNumber = 0
detObj = {}
tlv_type = 0
readBuffer = Dataport.read(Dataport.in_waiting)
byteVec = np.frombuffer(readBuffer, dtype = 'uint8')
byteCount = len(byteVec)
# Check that the buffer is not full, and then add the data to the buffer
if (byteBufferLength + byteCount) < maxBufferSize:
byteBuffer[byteBufferLength:byteBufferLength + byteCount] = byteVec[:byteCount]
byteBufferLength = byteBufferLength + byteCount
# Check that the buffer has some data
if byteBufferLength > 16:
# Check for all possible locations of the magic word
possibleLocs = np.where(byteBuffer == magicWord[0])[0]
# Confirm that is the beginning of the magic word and store the index in startIdx
startIdx = []
for loc in possibleLocs:
check = byteBuffer[loc:loc+8]
if np.all(check == magicWord):
startIdx.append(loc)
# Check that startIdx is not empty
if startIdx:
# Remove the data before the first start index
if startIdx[0] > 0 and startIdx[0] < byteBufferLength:
byteBuffer[:byteBufferLength-startIdx[0]] = byteBuffer[startIdx[0]:byteBufferLength]
byteBuffer[byteBufferLength-startIdx[0]:] = np.zeros(len(byteBuffer[byteBufferLength-startIdx[0]:]),dtype = 'uint8')
byteBufferLength = byteBufferLength - startIdx[0]
# Check that there have no errors with the byte buffer length
if byteBufferLength < 0:
byteBufferLength = 0
# word array to convert 4 bytes to a 32 bit number
word = [1, 2**8, 2**16, 2**24]
# Read the total packet length
totalPacketLen = np.matmul(byteBuffer[12:12+4],word)
# Check that all the packet has been read
if (byteBufferLength >= totalPacketLen) and (byteBufferLength != 0):
magicOK = 1
# If magicOK is equal to 1 then process the message
if magicOK:
# word array to convert 4 bytes to a 32 bit number
word = [1, 2**8, 2**16, 2**24]
# Initialize the pointer index
idX = 0
# Read the header
magicNumber = byteBuffer[idX:idX+8]
idX += 8
version = format(np.matmul(byteBuffer[idX:idX+4],word),'x')
idX += 4
totalPacketLen = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
platform = format(np.matmul(byteBuffer[idX:idX+4],word),'x')
idX += 4
frameNumber = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
timeCpuCycles = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
numDetectedObj = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
numTLVs = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
subFrameNumber = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
# Read the TLV messages
for tlvIdx in range(numTLVs):
# word array to convert 4 bytes to a 32 bit number
word = [1, 2**8, 2**16, 2**24]
# Check the header of the TLV message
try:
tlv_type = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
tlv_length = np.matmul(byteBuffer[idX:idX+4],word)
idX += 4
except:
pass
# Read the data depending on the TLV message
if tlv_type == MMWDEMO_UART_MSG_DETECTED_POINTS:
# word array to convert 4 bytes to a 16 bit number
word = [1, 2**8]
tlv_numObj = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
tlv_xyzQFormat = 2**np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
# Initialize the arrays
rangeIdx = np.zeros(tlv_numObj,dtype = 'int16')
dopplerIdx = np.zeros(tlv_numObj,dtype = 'int16')
peakVal = np.zeros(tlv_numObj,dtype = 'int16')
x = np.zeros(tlv_numObj,dtype = 'int16')
y = np.zeros(tlv_numObj,dtype = 'int16')
z = np.zeros(tlv_numObj,dtype = 'int16')
for objectNum in range(tlv_numObj):
# Read the data for each object
rangeIdx[objectNum] = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
dopplerIdx[objectNum] = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
peakVal[objectNum] = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
x[objectNum] = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
y[objectNum] = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
z[objectNum] = np.matmul(byteBuffer[idX:idX+2],word)
idX += 2
# Make the necessary corrections and calculate the rest of the data
rangeVal = rangeIdx * configParameters["rangeIdxToMeters"]
dopplerIdx[dopplerIdx > (configParameters["numDopplerBins"]/2 - 1)] = dopplerIdx[dopplerIdx > (configParameters["numDopplerBins"]/2 - 1)] - 65535
dopplerVal = dopplerIdx * configParameters["dopplerResolutionMps"]
#x[x > 32767] = x[x > 32767] - 65536
#y[y > 32767] = y[y > 32767] - 65536
#z[z > 32767] = z[z > 32767] - 65536
x = x / tlv_xyzQFormat
y = y / tlv_xyzQFormat
z = z / tlv_xyzQFormat
# Store the data in the detObj dictionary
detObj = {"numObj": tlv_numObj, "rangeIdx": rangeIdx, "range": rangeVal, "dopplerIdx": dopplerIdx, \
"doppler": dopplerVal, "peakVal": peakVal, "x": x, "y": y, "z": z}
dataOK = 1
# Remove already processed data
if idX > 0 and byteBufferLength > idX:
shiftSize = totalPacketLen
byteBuffer[:byteBufferLength - shiftSize] = byteBuffer[shiftSize:byteBufferLength]
byteBuffer[byteBufferLength - shiftSize:] = np.zeros(len(byteBuffer[byteBufferLength - shiftSize:]),dtype = 'uint8')
byteBufferLength = byteBufferLength - shiftSize
# Check that there are no errors with the buffer length
if byteBufferLength < 0:
byteBufferLength = 0
return dataOK, frameNumber, detObj
# ------------------------------------------------------------------
# Funtion to update the data and display in the plot
def update():
dataOk = 0
global detObj
x = []
y = []
# Read and parse the received data
dataOk, frameNumber, detObj = readAndParseData16xx(Dataport, configParameters)
if dataOk and len(detObj["x"])>0:
#print(detObj)
x = -detObj["x"]
y = detObj["y"]
s.setData(x,y)
QtGui.QApplication.processEvents()
return dataOk
# ------------------------- MAIN -----------------------------------------
# Configurate the serial port
CLIport, Dataport = serialConfig(configFileName)
# Get the configuration parameters from the configuration file
configParameters = parseConfigFile(configFileName)
# START QtAPPfor the plot
app = QtGui.QApplication([])
# Set the plot
pg.setConfigOption('background','w')
win = pg.GraphicsLayoutWidget(title="2D scatter plot")
p = win.addPlot()
p.setXRange(-0.5,0.5)
p.setYRange(0,1.5)
p.setLabel('left',text = 'Y position (m)')
p.setLabel('bottom', text= 'X position (m)')
s = p.plot([],[],pen=None,symbol='o')
win.show()
# Main loop
detObj = {}
frameData = {}
currentIndex = 0
while True:
try:
# Update the data and check if the data is okay
dataOk = update()
if dataOk:
# Store the current frame into frameData
frameData[currentIndex] = detObj
currentIndex += 1
time.sleep(0.03) # Sampling frequency of 30 Hz
# Stop the program and close everything if Ctrl + c is pressed
except KeyboardInterrupt:
CLIport.write(('sensorStop\n').encode())
CLIport.close()
Dataport.close()
win.close()
break```
[1]: https://i.sstatic.net/BYLnW.png
Upvotes: 0
Views: 42