DrBwts
DrBwts

Reputation: 3657

MQTT: Client publishing but subscribers not getting data with QoS 2

Raspberry Pi 3B+, W10 x64, Paho, Mosquitto MQTT

I have data being sent in 358.4 Kb payloads once a second via MQTT from a Raspberry Pi to a W10 x64 machine. I'm getting the following with different QoS values,

QoS 0: most of the data but some payloads missing, different sent & recieved counts.

QoS 1: most of the data but some payloads missing, different sent & recieved counts.

QoS 2: publisher claims to have sent everything even though the subscriber is recieving nothing. I have a running message at both ends showing the number of sent & recieved in real time.

Anyone know why this is happening?

The publisher side is in C & relevent code snippets below,

#include "../daqhats/examples/c/daqhats_utils.h"
// Whole load of includes here
#include "MQTTClient.h"

#define MQTT_ADDRSS "tcp://localhost:1883"
#define CLIENTID    "ExampleClientPub"
#define TOPIC       "mqtt_test"
#define QOS         2
#define TIMEOUT     1000L 

int pub_count = 0;

int publish(MQTTClient client, 
            MQTTClient_deliveryToken token, 
            MQTTClient_connectOptions conn_opts, 
            char* payload,
            int device)
{
    int rc = 0;
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(-1);
    }
    
    int payload_len = 25600 * sizeof(double);
    MQTTClient_publish(client, TOPIC, payload_len, &payload, QOS, 0, &token);
    printf("Published: %d\n", device);
    pub_count++;
    
    return rc;
}
        

int main(void)
{
    // MQTT initialzers
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_deliveryToken token;
    MQTTClient_create(&client, MQTT_ADDRSS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    union DAQ_Data{
        double f[2 * 25600];
        char   s[2 * 25600 * sizeof(double)];
    };

    
   // A whole load of DAQ hardware set up here


    union DAQ_Data payload;
    
    do //while (is_running)
    {
        for (device = 0; device < DEVICE_COUNT; device++)
        {
            // Read data
            result = mcc172_a_in_scan_read(address[device], &scan_status[device], samples_to_read,
                                           timeout, payload.f, buffer_size, 
                                           &samples_read[device]);
                                           
            publish(client, token, conn_opts, payload.s, device);
            
            // More DAQ stuff
        }


        fflush(stdout);
        
        usleep(100000);
        

        if (enter_press())
        {
            printf("Aborted - enter pressed\n\n");
            break;
        }

    }
    while (!enter_press());
 
    printf("Pub count: %d\n", pub_count);
    sleep(2);

    // DAQ hardware clean up

    return 0;
}

The PC subscriber side in Python, taken fromm the Eclipse MQTT Client example,

import argparse
import os
import ssl
import sys

import paho.mqtt.client as mqtt

parser = argparse.ArgumentParser()

parser.add_argument('-H', '--host', required=False, default="192.168.30.212")#"mqtt.eclipseprojects.io")
parser.add_argument('-t', '--topic', required=False, default="mqtt_test") #"$SYS/#")
parser.add_argument('-q', '--qos', required=False, type=int, default=2)
parser.add_argument('-c', '--clientid', required=False, default=None)
parser.add_argument('-u', '--username', required=False, default=None)
parser.add_argument('-d', '--disable-clean-session', action='store_true', help="disable 'clean session' (sub + msgs not cleared when client disconnects)")
parser.add_argument('-p', '--password', required=False, default=None)
parser.add_argument('-P', '--port', required=False, type=int, default=None, help='Defaults to 8883 for TLS or 1883 for non-TLS')
parser.add_argument('-k', '--keepalive', required=False, type=int, default=60)
parser.add_argument('-s', '--use-tls', action='store_true')
parser.add_argument('--insecure', action='store_true')
parser.add_argument('-F', '--cacerts', required=False, default=None)
parser.add_argument('--tls-version', required=False, default=None, help='TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1\n')
parser.add_argument('-D', '--debug', action='store_true')

args, unknown = parser.parse_known_args()

mq = open("mq_test.bin", "wb")
rec_count = 0

def on_connect(mqttc, obj, flags, rc):
    print("rc: " + str(rc))


def on_message(mqttc, obj, msg):
    
    global rec_count  
    rec_count = rec_count + 1
    # message = msg.payload.decode("utf-8") + '\n'
    mq.write(msg.payload)
    print("recieved: ", rec_count)


def on_publish(mqttc, obj, mid):
    print("mid: " + str(mid))


def on_subscribe(mqttc, obj, mid, granted_qos):
    print("Subscribed: " + str(mid) + " " + str(granted_qos))


def on_log(mqttc, obj, level, string):
    print(string)

usetls = args.use_tls

if args.cacerts:
    usetls = True

port = args.port    
if port is None:
    if usetls:
        port = 8883
    else:
        port = 1883

mqttc = mqtt.Client(args.clientid,clean_session = not args.disable_clean_session)

if usetls:
    if args.tls_version == "tlsv1.2":
       tlsVersion = ssl.PROTOCOL_TLSv1_2
    elif args.tls_version == "tlsv1.1":
       tlsVersion = ssl.PROTOCOL_TLSv1_1
    elif args.tls_version == "tlsv1":
       tlsVersion = ssl.PROTOCOL_TLSv1
    elif args.tls_version is None:
       tlsVersion = None
    else:
       print ("Unknown TLS version - ignoring")
       tlsVersion = None

    if not args.insecure:
        cert_required = ssl.CERT_REQUIRED
    else:
        cert_required = ssl.CERT_NONE
        
    mqttc.tls_set(ca_certs=args.cacerts, certfile=None, keyfile=None, cert_reqs=cert_required, tls_version=tlsVersion)

    if args.insecure:
        mqttc.tls_insecure_set(True)

if args.username or args.password:
    mqttc.username_pw_set(args.username, args.password)

mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe

if args.debug:
    mqttc.on_log = on_log

print("Connecting to "+args.host+" port: "+str(port))
mqttc.connect(args.host, port, args.keepalive)
mqttc.subscribe(args.topic, args.qos)

mqttc.loop_forever()

Upvotes: 0

Views: 893

Answers (1)

Peter Giacomo Lombardo
Peter Giacomo Lombardo

Reputation: 712

There are a couple problems and points to note:

  1. The MQTT protocol encodes the payload size into the MQTT packet so if only partial data is being sent or received, that is a Malformed Packet and it depends on the client/broker how it handles that. But the point is that, valid packets received by your Python code are guaranteed to have the correct payload.

  2. You are using loop_forever() which calls the callbacks asynchronously and at the same time you are writing to mq synchronously - this is guaranteed data loss. Handle the data in an async manner - write to separate files, use a database, pass off to another process. Remember that packets aren't even guaranteed to be received in order.

If you use the loop_start() or loop_forever functions then the loop runs in a separate thread, and it is the loop that processes the incoming and outgoing messages.

In this case the callbacks can occur any time in your script and are asynchronous.

Source

I believe these are the core issues. Fix those and you should have no more data loss.

Upvotes: 1

Related Questions