Paho Man
Paho Man

Reputation: 3

What is the right way to use this code (Paho MQTT) as GoRoutine and pass messages via channel to publish via websockets

As standard code I am using to publish message for testing purpose:

func main() {

    opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")
    opts.SetClientID("myclientid_")
    opts.SetDefaultPublishHandler(f)
    opts.SetConnectionLostHandler(connLostHandler)

    opts.OnConnect = func(c MQTT.Client) {
        fmt.Printf("Client connected, subscribing to: test/topic\n")

        if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
            os.Exit(1)
        }
    }

    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }


    for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("logs", 0, false, text)
        token.Wait()
    }

    time.Sleep(3 * time.Second)

    if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    c.Disconnect(250)
}

This works well ! but passing messages in mass while doing high latency tasks, performance of my program will be low, so I have to use goroutine and channel.

So, I was looking for a way to make a Worker inside goroutine for PUBLISHING messages to the browser using Paho MQTT library for GOlang, I had a hard time to find a better solution that feet my need, but after some searches, I found this code:

package main

import (
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "io/ioutil"
    "strings"
    "time"

    MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
    "linksmart.eu/lc/core/catalog"
    "linksmart.eu/lc/core/catalog/service"
)

// MQTTConnector provides MQTT protocol connectivity
type MQTTConnector struct {
    config        *MqttProtocol
    clientID      string
    client        *MQTT.Client
    pubCh         chan AgentResponse
    subCh         chan<- DataRequest
    pubTopics     map[string]string
    subTopicsRvsd map[string]string // store SUB topics "reversed" to optimize lookup in messageHandler
}

const defaultQoS = 1

func (c *MQTTConnector) start() {
    logger.Println("MQTTConnector.start()")

    if c.config.Discover && c.config.URL == "" {
        err := c.discoverBrokerEndpoint()
        if err != nil {
            logger.Println("MQTTConnector.start() failed to start publisher:", err.Error())
            return
        }
    }

    // configure the mqtt client
    c.configureMqttConnection()

    // start the connection routine
    logger.Printf("MQTTConnector.start() Will connect to the broker %v\n", c.config.URL)
    go c.connect(0)

    // start the publisher routine
    go c.publisher()
}

// reads outgoing messages from the pubCh und publishes them to the broker
func (c *MQTTConnector) publisher() {
    for resp := range c.pubCh {
        if !c.client.IsConnected() {
            logger.Println("MQTTConnector.publisher() got data while not connected to the broker. **discarded**")
            continue
        }
        if resp.IsError {
            logger.Println("MQTTConnector.publisher() data ERROR from agent manager:", string(resp.Payload))
            continue
        }
        topic := c.pubTopics[resp.ResourceId]
        c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
        // We dont' wait for confirmation from broker (avoid blocking here!)
        //<-r
        logger.Println("MQTTConnector.publisher() published to", topic)
    }
}


func (c *MQTTConnector) stop() {
    logger.Println("MQTTConnector.stop()")
    if c.client != nil && c.client.IsConnected() {
        c.client.Disconnect(500)
    }
}

func (c *MQTTConnector) connect(backOff int) {
    if c.client == nil {
        logger.Printf("MQTTConnector.connect() client is not configured")
        return
    }
    for {
        logger.Printf("MQTTConnector.connect() connecting to the broker %v, backOff: %v sec\n", c.config.URL, backOff)
        time.Sleep(time.Duration(backOff) * time.Second)
        if c.client.IsConnected() {
            break
        }
        token := c.client.Connect()
        token.Wait()
        if token.Error() == nil {
            break
        }
        logger.Printf("MQTTConnector.connect() failed to connect: %v\n", token.Error().Error())
        if backOff == 0 {
            backOff = 10
        } else if backOff <= 600 {
            backOff *= 2
        }
    }

    logger.Printf("MQTTConnector.connect() connected to the broker %v", c.config.URL)
    return
}

func (c *MQTTConnector) onConnected(client *MQTT.Client) {
    // subscribe if there is at least one resource with SUB in MQTT protocol is configured
    if len(c.subTopicsRvsd) > 0 {
        logger.Println("MQTTPulbisher.onConnected() will (re-)subscribe to all configured SUB topics")

        topicFilters := make(map[string]byte)
        for topic, _ := range c.subTopicsRvsd {
            logger.Printf("MQTTPulbisher.onConnected() will subscribe to topic %s", topic)
            topicFilters[topic] = defaultQoS
        }
        client.SubscribeMultiple(topicFilters, c.messageHandler)
    } else {
        logger.Println("MQTTPulbisher.onConnected() no resources with SUB configured")
    }
}

func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
    logger.Println("MQTTPulbisher.onConnectionLost() lost connection to the broker: ", reason.Error())

    // Initialize a new client and reconnect
    c.configureMqttConnection()
    go c.connect(0)
}

func (c *MQTTConnector) configureMqttConnection() {
    connOpts := MQTT.NewClientOptions().
        AddBroker(c.config.URL).
        SetClientID(c.clientID).
        SetCleanSession(true).
        SetConnectionLostHandler(c.onConnectionLost).
        SetOnConnectHandler(c.onConnected).
        SetAutoReconnect(false) // we take care of re-connect ourselves

    // Username/password authentication
    if c.config.Username != "" && c.config.Password != "" {
        connOpts.SetUsername(c.config.Username)
        connOpts.SetPassword(c.config.Password)
    }

    // SSL/TLS
    if strings.HasPrefix(c.config.URL, "ssl") {
        tlsConfig := &tls.Config{}
        // Custom CA to auth broker with a self-signed certificate
        if c.config.CaFile != "" {
            caFile, err := ioutil.ReadFile(c.config.CaFile)
            if err != nil {
                logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to read CA file %s:%s\n", c.config.CaFile, err.Error())
            } else {
                tlsConfig.RootCAs = x509.NewCertPool()
                ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
                if !ok {
                    logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to parse CA certificate %s\n", c.config.CaFile)
                }
            }
        }
        // Certificate-based client authentication
        if c.config.CertFile != "" && c.config.KeyFile != "" {
            cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
            if err != nil {
                logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to load client TLS credentials: %s\n",
                    err.Error())
            } else {
                tlsConfig.Certificates = []tls.Certificate{cert}
            }
        }

        connOpts.SetTLSConfig(tlsConfig)
    }

    c.client = MQTT.NewClient(connOpts)
}

This code do exactly what I am looking for !

But as noob in Golang, I can't figure out how to run START() function inside my main function and what argument to pass !

And espacially, how I will process to pass messages to the worker (Publisher) using channel ?!

Your help will be appreciated !

Upvotes: 0

Views: 3711

Answers (2)

Brits
Brits

Reputation: 18315

I posted the answer below on the github repo but as you have asked the same question here thought it was worth cross posting (with a bit more info).

When you say "passing messages in mass while doing high latency tasks" I assume that you mean that you want to send the messages asynchronously (so the message is handled by a different go-routine than your main code is running on).

If that is the case then a very simple change to your initial example will give you that:

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("logs", 0, false, text)
        // comment out... token.Wait()
    }

Note: Your example code may exit before the messages are actually sent; adding time.Sleep(10 * time.Second) would give it time for these to go out; see the code below for another way of handling this

The only reason that your initial code stopped until the message was sent was that you called token.Wait(). If you don't care about errors (and you are not checking for them so I assume you dont care) then there is little point in calling token.Wait() (it just waits until the message is sent; the message will go out whether you call token.Wait() or not).

If you want to log any errors you could use something like:

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("logs", 0, false, text)
        go func(){
            token.Wait()
            err := token.Error()
            if err != nil {
                fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
            }
        }()
    }

Note that there are a few more things that you need to do if message delivery is critical (but as you are not checking for errors I'm assuming its not).

In terms of the code you found; I suspect that this would add complexity you dont need (and more info would be required to work this out; for example the MqttProtocol struct is not defined within the bit you pasted).

Extra bit... In your comments you mentioned "Published messages must be ordered". If that is essential (so you want to wait until each message has been delivered before sending another) then you need something like:

msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
var wg sync.WaitGroup
wg.Add(1)
go func(){ // go routine to send messages from channel
    for msg := range msgChan {
        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
        token.Wait()
        // should check for errors here
    }
    wg.Done()
}()

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        msgChan <- text
    }
close(msgChan) // this will stop the goroutine (when all messages processed)
wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)

Note: This is similar to the solution from Ilya Kaznacheev (if you set workerPoolSize to 1 and make the channel buffered)

As your comments indicate that the wait group makes this difficult to understand here is another way of waiting that might be clearer (waitgroups are generally used when you are waiting for multiple things to finnish; in this example we are only waiting for one thing so a simpler approach can be used)

msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
done := make(chan struct{}) // channel used to indicate when go routine has finnished

go func(){ // go routine to send messages from channel
    for msg := range msgChan {
        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
        token.Wait()
        // should check for errors here
    }
    close(done) // let main routine know we have finnished
}()

for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        msgChan <- text
    }
close(msgChan) // this will stop the goroutine (when all messages processed)
<-done // wait for publish go routine to complete

Upvotes: 5

Ilya Kaznacheev
Ilya Kaznacheev

Reputation: 168

Why don't you just split message sending into a bunch of workers?

Something like this:

...
    const workerPoolSize = 10 // the number of workers you want to have
    wg := &sync.WaitGroup{}
    wCh := make(chan string)
    wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job

    // run workers in goroutines
    for i := 0; i < workerPoolSize; i++ {
        go func(wch <-chan string) {
            // get the data from the channel
            for text := range wch {
                c.Publish("logs", 0, false, text)
                token.Wait()
            }
            wg.Done() // worker says that he finishes the job
        }(wCh)
    }

    for i := 0; i < 5; i++ {
        // put the data to the channel
        wCh <- fmt.Sprintf("this is msg #%d!", i)
    }

        close(wCh)
    wg.Wait() // wait for all workers to finish
...

Upvotes: 0

Related Questions