Reputation: 3657
Raspberry Pi 3B+, W10 x64, Paho, Mosquitto MQTT
I have data being sent in 358.4 Kb payloads once a second via MQTT from a Raspberry Pi to a W10 x64 machine. I'm getting the following with different QoS values,
QoS 0: most of the data but some payloads missing, different sent & recieved counts.
QoS 1: most of the data but some payloads missing, different sent & recieved counts.
QoS 2: publisher claims to have sent everything even though the subscriber is recieving nothing. I have a running message at both ends showing the number of sent & recieved in real time.
Anyone know why this is happening?
The publisher side is in C & relevent code snippets below,
#include "../daqhats/examples/c/daqhats_utils.h"
// Whole load of includes here
#include "MQTTClient.h"
#define MQTT_ADDRSS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "mqtt_test"
#define QOS 2
#define TIMEOUT 1000L
int pub_count = 0;
int publish(MQTTClient client,
MQTTClient_deliveryToken token,
MQTTClient_connectOptions conn_opts,
char* payload,
int device)
{
int rc = 0;
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(-1);
}
int payload_len = 25600 * sizeof(double);
MQTTClient_publish(client, TOPIC, payload_len, &payload, QOS, 0, &token);
printf("Published: %d\n", device);
pub_count++;
return rc;
}
int main(void)
{
// MQTT initialzers
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_deliveryToken token;
MQTTClient_create(&client, MQTT_ADDRSS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
union DAQ_Data{
double f[2 * 25600];
char s[2 * 25600 * sizeof(double)];
};
// A whole load of DAQ hardware set up here
union DAQ_Data payload;
do //while (is_running)
{
for (device = 0; device < DEVICE_COUNT; device++)
{
// Read data
result = mcc172_a_in_scan_read(address[device], &scan_status[device], samples_to_read,
timeout, payload.f, buffer_size,
&samples_read[device]);
publish(client, token, conn_opts, payload.s, device);
// More DAQ stuff
}
fflush(stdout);
usleep(100000);
if (enter_press())
{
printf("Aborted - enter pressed\n\n");
break;
}
}
while (!enter_press());
printf("Pub count: %d\n", pub_count);
sleep(2);
// DAQ hardware clean up
return 0;
}
The PC subscriber side in Python, taken fromm the Eclipse MQTT Client example,
import argparse
import os
import ssl
import sys
import paho.mqtt.client as mqtt
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', required=False, default="192.168.30.212")#"mqtt.eclipseprojects.io")
parser.add_argument('-t', '--topic', required=False, default="mqtt_test") #"$SYS/#")
parser.add_argument('-q', '--qos', required=False, type=int, default=2)
parser.add_argument('-c', '--clientid', required=False, default=None)
parser.add_argument('-u', '--username', required=False, default=None)
parser.add_argument('-d', '--disable-clean-session', action='store_true', help="disable 'clean session' (sub + msgs not cleared when client disconnects)")
parser.add_argument('-p', '--password', required=False, default=None)
parser.add_argument('-P', '--port', required=False, type=int, default=None, help='Defaults to 8883 for TLS or 1883 for non-TLS')
parser.add_argument('-k', '--keepalive', required=False, type=int, default=60)
parser.add_argument('-s', '--use-tls', action='store_true')
parser.add_argument('--insecure', action='store_true')
parser.add_argument('-F', '--cacerts', required=False, default=None)
parser.add_argument('--tls-version', required=False, default=None, help='TLS protocol version, can be one of tlsv1.2 tlsv1.1 or tlsv1\n')
parser.add_argument('-D', '--debug', action='store_true')
args, unknown = parser.parse_known_args()
mq = open("mq_test.bin", "wb")
rec_count = 0
def on_connect(mqttc, obj, flags, rc):
print("rc: " + str(rc))
def on_message(mqttc, obj, msg):
global rec_count
rec_count = rec_count + 1
# message = msg.payload.decode("utf-8") + '\n'
mq.write(msg.payload)
print("recieved: ", rec_count)
def on_publish(mqttc, obj, mid):
print("mid: " + str(mid))
def on_subscribe(mqttc, obj, mid, granted_qos):
print("Subscribed: " + str(mid) + " " + str(granted_qos))
def on_log(mqttc, obj, level, string):
print(string)
usetls = args.use_tls
if args.cacerts:
usetls = True
port = args.port
if port is None:
if usetls:
port = 8883
else:
port = 1883
mqttc = mqtt.Client(args.clientid,clean_session = not args.disable_clean_session)
if usetls:
if args.tls_version == "tlsv1.2":
tlsVersion = ssl.PROTOCOL_TLSv1_2
elif args.tls_version == "tlsv1.1":
tlsVersion = ssl.PROTOCOL_TLSv1_1
elif args.tls_version == "tlsv1":
tlsVersion = ssl.PROTOCOL_TLSv1
elif args.tls_version is None:
tlsVersion = None
else:
print ("Unknown TLS version - ignoring")
tlsVersion = None
if not args.insecure:
cert_required = ssl.CERT_REQUIRED
else:
cert_required = ssl.CERT_NONE
mqttc.tls_set(ca_certs=args.cacerts, certfile=None, keyfile=None, cert_reqs=cert_required, tls_version=tlsVersion)
if args.insecure:
mqttc.tls_insecure_set(True)
if args.username or args.password:
mqttc.username_pw_set(args.username, args.password)
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
if args.debug:
mqttc.on_log = on_log
print("Connecting to "+args.host+" port: "+str(port))
mqttc.connect(args.host, port, args.keepalive)
mqttc.subscribe(args.topic, args.qos)
mqttc.loop_forever()
Upvotes: 0
Views: 893
Reputation: 712
There are a couple problems and points to note:
The MQTT protocol encodes the payload size into the MQTT packet so if only partial data is being sent or received, that is a Malformed Packet and it depends on the client/broker how it handles that. But the point is that, valid packets received by your Python code are guaranteed to have the correct payload.
You are using loop_forever()
which calls the callbacks asynchronously and at the same time you are writing to mq
synchronously - this is guaranteed data loss. Handle the data in an async manner - write to separate files, use a database, pass off to another process. Remember that packets aren't even guaranteed to be received in order.
If you use the loop_start() or loop_forever functions then the loop runs in a separate thread, and it is the loop that processes the incoming and outgoing messages.
In this case the callbacks can occur any time in your script and are asynchronous.
I believe these are the core issues. Fix those and you should have no more data loss.
Upvotes: 1