galaxy_twirl
galaxy_twirl

Reputation: 75

Raspberry Pi UART Interrupt in Python

I am trying to collect data coming in from 3 Arduino nodes (which are connected to some sensors) via XBees. The 3 Arduino nodes are each attached to an XBee Series 1 radio (End Device), while the Coordinator is connected to my PC (but it will later be connected to a Raspberry Pi 2 Model B).

I was thinking, is there is a way to implement UART interrupts in Python on the Raspberry Pi? This is because, based on my code below, I am adopting a polling approach to read the data that is coming in from my 3 Arduino sensor nodes. The data the is coming in from the 3 Arduino sensor nodes every second, as seen from the code below.

However, my polling approach is causing some issues because I only do "arduino.readline()" at the very beginning of my code and after that, I will proceed on to do other things in the Python code.

Hence, I was wondering if there are any ways to implement UART interrupt with Python so that whenever there is incoming data, I will do an arduino.readline() and proceed on to handle the data.

My codes are as of below: (The Arduino code is about the same for all of the 3 nodes.)

Python code:

import threading
import logging
import time
import serial
import csv
# Arduino; Arduino is now replaced by XBee modules
arduino = serial.Serial('COM18', 9600, timeout=1)  # Open serial port. By right, should check if it is open
def checker(field):
    if 'b' in field:
        field = field.replace('b', '')
    if 't' in field:
        field = field.replace('t', '')
    return field
def make_backup(finalmatrix, field_send, flag): # Back up the 2 most-recent data.
    matrix1 = []  # declares a list
    matrix1.append(flag) # appends the counter 'flag'
    matrix1.extend(field_send) # attaches the list to the end of current list; doesn't work with int
    # finalmatrix is a list of list; if len(finalmatrix) == 2 means: finalmatrix == [[1,2], [3,4]]
    if (len(finalmatrix) == 2):
        finalmatrix = finalmatrix[1:] # Create a new list, take the 2nd mini-list in the list and put it in the new list
        finalmatrix.append(matrix1) # appends the new matrix1
    else:
        finalmatrix.append(matrix1)
    return finalmatrix
def acquire_data():
    flag = 1
    finalmatrix = []
    count = 0
    while True:
        # data_in
        start_time = time.time()
        try:
            data_in = arduino.readline()  # read serial data from Arduino
            # logging.warning(data_in) # debugger
        except:
            pass
        global data_stripped
        data_stripped = data_in.strip()  # Removes spaces and \n
        for data_stripped in arduino:
            if data_stripped.startswith('b') and data_stripped.count(
                    ',') == 3:  # first char identifies where data is coming from; count commas to double-check incoming string
                field = data_stripped.split(',')  # split data to be put into 'boxes'
                bed_sen = field[0] + ',' + field[1] + ',' + field[2]  # We have 3 data sensor fields
                bed_sen_fill = True  # A flag to show that this if-statement has been completed
            if data_stripped.startswith('t') and data_stripped.count(',') == 3:
                field = data_stripped.split(',')
                table_sen = field[0] + ',' + field[1] + ',' + field[2]
                table_sen_fill = True
            if data_stripped.startswith('d') and data_stripped.count(',') == 3:
                field = data_stripped.split(',')
                door_sen = field[0] + ',' + field[1] + ',' + field[2]
                door_sen_fill = True
            try:
                if bed_sen_fill == True and table_sen_fill == True and door_sen_fill == True:
                    data_combi = bed_sen + ',' + table_sen + ',' + door_sen
                    break
            except:
                pass
        if data_combi:
            datasplit = data_combi.split(",")
            field1 = datasplit[0]
            field2 = datasplit[1]
            field3 = datasplit[2]
            field4 = datasplit[3]
            field5 = datasplit[4]
            field6 = datasplit[5]
           field7 = datasplit[6]  # To be deprecated in future; IR sensor no longer in use.
            field8 = datasplit[7]
            field9 = datasplit[8]
        # Removes 'b', 't' chars in strings
        field1 = checker(field1)
        field2 = checker(field2)
        field3 = checker(field3)
        field4 = checker(field4)
        field5 = checker(field5)
        field6 = checker(field6)
        field8 = checker(field8)
        field9 = checker(field9)
        ##### Data Validity Check #####
        field_send = []  # rearrange into list
        field_send.append(field1)
        field_send.append(field2)
        field_send.append(field3)
        field_send.append(field4)
        field_send.append(field5)
        field_send.append(field6)
        field_send.append(field7)
        field_send.append(field8)
        field_send.append(field9)
        if flag == 1:
            finalmatrix = make_backup(finalmatrix, field_send, flag)
            flag = flag + 1
        else:
            finalmatrix = make_backup(finalmatrix, field_send, flag)
            flag = 1
        # print finalmatrix
        if (count == 2 and int(field1) > 1000 or field1 == ''):  # If re-sends, 'b' will appear twice; light_val<1000 always.
            field1 = (int(finalmatrix[0][1]) + int(finalmatrix[1][1])) / 2
            # print ("Error1!")
        if (count == 2 and field2.count('-') == 2 or len(field2) > 3 or field2 == ''):  # If re-send, '-' will appear twice; field2 max 3 digits
            field2 = (int(finalmatrix[0][2]) + int(finalmatrix[1][2])) / 2
            # print ("Error2!")
        if (count == 2 and len(field3) > 1 or field3.count('01') == 1 or field3.count('10') == 1 or field3.count('-') == 1 or field3 == ''):  # Motion_sen is purely binary.
            field3 = (int(finalmatrix[0][3]) + int(finalmatrix[1][3])) / 2
            # print ("Error3!")
        if (count == 2 and int(field4) > 1000 or field4 == ''):  # If re-sends, 't' will appear twice; light_val<1000 always.
            field4 = (int(finalmatrix[0][4]) + int(finalmatrix[1][4])) / 2
            # print ("Error4!")
        if (count == 2 and field5.count('-') == 2 or len(field5) > 3 or field5 == ''):  # If re-send, '-' will appear twice; field5 max 3 digits
            field5 = (int(finalmatrix[0][5]) + int(finalmatrix[1][5])) / 2
            # print ("Error5!")
        if (count == 2 and len(field6) > 1 or field6.count('01') == 1 or field6.count('10') == 1 or field6.count('-') == 1 or field6 == ''):  # Motion_sen is purely binary.
            field6 = (int(finalmatrix[0][6]) + int(finalmatrix[1][6])) / 2
            # print ("Error6!")
        if (count == 2 and len(field8) > 1 or field8.count('01') == 1 or field8.count('10') == 1 or field8.count('-') == 1 or field8 == ''):  # Motion_sen is binary.
            field8 = (int(finalmatrix[0][8]) + int(finalmatrix[1][8])) / 2
            # print ("Error7!")
        if (count == 2 and len(field9) > 1 or field9.count('01') == 1 or field9.count('10') == 1 or field9.count('-') == 1 or field9 == ''):  # SW is binary too!
            field9 = (int(finalmatrix[0][9]) + int(finalmatrix[1][9])) / 2
            # print ("Error8!")
        with open('abs_testdatadiscrep4.csv', 'ab') as csvfile:  # 'ab' to remove newline char after each print
            writer = csv.writer(csvfile)
            sensor_fields = [field1, field2, field3, field4, field5, field6, field7, field8, field9,
                             time.strftime("%H%M%S")]
            if time.time() - start_time >= 1:
                writer.writerow(sensor_fields)
        count = count + 1
        if count == 2:
            count = 2
        print "My program took", time.time() - start_time, "to run"
        time.sleep(0.5)
        # print "My program took", time.time() - start_time, "to run"
def counting():
    while True:
        sum = 3 + 2
        sum2 = sum * 8
        time.sleep(0.2)
def on_light():
    strin = '1'
    arduino.write(strin.encode())
    print "Confirm ON"
def off_light():
    strin = '0'
    arduino.write(strin.encode())
    print "Confirm OFF"
# now threading1 runs regardless of user input
threading1 = threading.Thread(target = acquire_data)
threading2 = threading.Thread(target = counting)
threading1.start()
threading2.start()
while True:
    if raw_input() == 't':
        on_light()
        print "ON"
    if raw_input() == 'r':
        off_light()
        print "OFF"
    time.sleep(1)

Arduino code:

#include <SoftwareSerial.h>

#define lightPin A0 // Light Sensor at Arduino A0 pin
#define echoPin 7 // Echo Pin
#define trigPin 8 // Trigger Pin
#define LEDPinUS 13 // LED at Pin 13 (Ultrasound)
#define pirPin 9 // Input for HC-S501
#define LEDPinPIR 12 // LED at Pin 12 (PIR)

SoftwareSerial xbee(2, 3); // RX, TX
int light ; //Value from light sensor
String lightID = "0";
String distanceUS = "0";
String pirID = "0";
String comma = ",";
int maximumRange = 150; // Maximum range needed
int minimumRange = 0; // Minimum range needed
long duration, distance; // Duration used to calculate distance
int OOR = 0; // OOR = Out of Range

int pirValue; // Place to store read PIR Value
int pirNum = 0;
int pirNumyes = 0;
int pirNumno = 0;

void setup() {
  Serial.begin(9600); //Baud rate must be the same as is on xBee module
  xbee.begin(9600);
  pinMode(lightPin, INPUT); // Light sensor
  pinMode(trigPin, OUTPUT); // Ultrasound sensor
  pinMode(echoPin, INPUT); // Ultrasound sensor
  pinMode(LEDPinUS, OUTPUT); // Ultrasound sensor indicator
  pinMode(pirPin, INPUT); // PIR sensor
  pinMode(LEDPinPIR, OUTPUT); // Ultrasound sensor indicator
  digitalWrite(LEDPinUS, LOW);
  digitalWrite(LEDPinPIR, LOW);
}

void loop() {
  // Light Sensor, "t" = table_sensors
  light = analogRead(lightPin);
  String ID = "t";
  lightID = ID + light + comma; //20-Mar-18: Added ","; disabled ',' printed serially
  // Ultrasound Sensor
  digitalWrite(trigPin, LOW); // clears Trigger pin
  delayMicroseconds(2);
  digitalWrite(trigPin, HIGH);
  delayMicroseconds(10);
  digitalWrite(trigPin, LOW);
  duration = pulseIn(echoPin, HIGH);
  // Calculate the distance (in cm) based on the speed of sound.
  distance = duration / 58.2;
  if (distance >= maximumRange || distance <= minimumRange) {
    /* Send a negative number to computer and Turn LED ON
      to indicate "out of range" */
    OOR = -1;
    distanceUS = OOR + comma; //20-Mar-18: Added ","; disabled ',' printed serially; removed "+ ID"
    digitalWrite(LEDPinUS, HIGH);
  } else {
    /* Send the distance to the computer using Serial protocol, and
      turn LED OFF to indicate successful reading. */
    distanceUS = distance + comma; //20-Mar-18: Added ","; disabled ',' printed serially; removed "+ ID"
    digitalWrite(LEDPinUS, LOW);
  }
  // Motion sensor
  pirValue = digitalRead(pirPin);
  if (pirValue == HIGH) {
    pirNumyes = 1;
    pirID = pirNumyes + comma; //20-Mar-18: Added ","; disabled ',' printed serially; removed "+ ID"
    digitalWrite(LEDPinPIR, HIGH);
  } else {
    pirNumno = 0;
    pirID = pirNumno + comma; //20-Mar-18: Added ","; disabled ',' printed serially; removed "+ ID"
    digitalWrite(LEDPinPIR, LOW);
  }
  String together = lightID + distanceUS + pirID;
  //Serial.println(together);
  xbee.println(together);
  delay(1000);
}

I have tried searching Google for an answer, but I can't seem to find any. The closest I got to was an answer from a similar StackOverflow post at Read serial data without high CPU use.

However, may I ask if there is a Python implementation for UART interrupts on Raspberry Pi?

Thank you very much for all your help! :)

Upvotes: 2

Views: 4256

Answers (1)

Robert Farmer
Robert Farmer

Reputation: 21

If you install an additional wire from the RX pin to any available GPIO you could then at least use the RpiGPIO module edge interrupt to alert you that a transfer is in process. It won't tell you that the buffer is full, you'll have to check for that when you service the GPIO interrupt. At least this would allow you to do something else other than wait in a loop and/or poll the UART for buffer data.

Upvotes: 2

Related Questions