hitchhiker
hitchhiker

Reputation: 1319

How to communicate between multiple goroutines with for loops with blocking function calls inside one of them

I'm writing a Go app which accepts a websocket connection, then starts:

  1. listen goroutine which listens on the connection for client messages and sends response for the client based on the received message via channel to updateClient.
  2. updateClient goroutine which writes to the connection.
  3. processExternalData goroutine which receives data from message queue, sends the data to updateClient via a channel so that updateClient can update the client with the data.

I'm using gorilla library for websocket connections, and its read call is blocking. In addition, both its write and read methods don't support concurrent calls, which is the main reason I have the updateClient goroutine which is the single routine which calls write method.

The problem arises when I need to close the connection which can happen at least in 2 cases:

  1. The client closed the connection or error occurred during read.
  2. processExternalData finished, there's no more data to update the client and the connection should be closed.

So updateClient needs to somehow notify listen to quit and vice versa listen needs to somehow notify updateClient to quit. updateClient has a quit channel inside select but listen can't have select because it already has a for loop with blocking read call inside.

So what I did is I added isJobFinished field on the connection type which is a condition for for loop to work:

type WsConnection struct {
    connection    *websocket.Conn
    writeChan     chan messageWithCb
    quitChan      chan bool
    isJobFinished bool
    userID        string
}

func processExternalData() {
    // receive data from message queue
    // send it to client via writeChan
}

func (conn *WsConnection) listen() {
    defer func() {
        conn.connection.Close()
        conn.quitChan <- true
    }()

    // keep the loop for communication with client
    for !conn.isJobFinished {
        _, message, err := conn.connection.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break

        }
        // convert message to type messageWithCb
        switch clientMessage.MessageType {
        case userNotFound:
            conn.writeChan <- messageWithCb{
                message: map[string]interface{}{
                    "type":    user,
                    "payload": false,
                },
            }
        default:
            log.Printf("Unknown message type received: %v", clientMessage)
        }
    }
    log.Println("end of listen")
}

func updateClient(w http.ResponseWriter, req *http.Request) {
    upgrader.CheckOrigin = func(req *http.Request) bool {
        return true
    }
    connection, err := upgrader.Upgrade(w, req, nil)
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
    wsConn := &WsConnection{
        connection: connection,
        writeChan:  make(chan messageWithCb),
        quitChan:   make(chan bool),
    }
    go wsConn.listen()
    for {
        select {
        case msg := <-wsConn.writeChan:
            err := connection.WriteJSON(msg.message)
            if err != nil {
                log.Println("connection.WriteJSON error: ", err)
            }
            if wsConn.isJobFinished {
                if msg.callback != nil {
                    msg.callback() // sends on `wsConn.quitChan` from a goroutine
                }
            }
        case <-wsConn.quitChan:
            // clean up
            wsConn.connection.Close()
            close(wsConn.writeChan)
            return
        }
    }
}

I'm wondering if a better pattern exists in Go for such cases. Specifically, I'd like to be able to have a quit channel inside listen as well so updateClient can notify it to quit instead of maintaining isJobFinished field. Also in this case there's no danger of not protecting isJobFinished field because only one method writes to it but if the logic gets more complicated then having to protect the field inside the for loop in listen will probably negatively impact the performance.

Also I can't close the quiteChan because both listen and updateClient use it and there's no way to know for them when it's closed by another one.

Upvotes: 0

Views: 861

Answers (1)

Thundercat
Thundercat

Reputation: 120941

Close the connection to break the listen goroutine out of the blocking read call.

In updateClient, add a defer statement to close the connection and clean up other resources. Return from the function on any error or a notification from the quit channel:

updateClient(w http.ResponseWriter, req *http.Request) {
    upgrader.CheckOrigin = func(req *http.Request) bool {
        return true
    }
    connection, err := upgrader.Upgrade(w, req, nil)
    if err != nil {
        log.Print("upgrade:", err)
        return
    }
    defer connection.Close() // <--- Add this line
    wsConn := &WsConnection{
        connection: connection,
        writeChan:  make(chan messageWithCb),
        quitChan:   make(chan bool),
    }
    defer close(writeChan) // <-- cleanup moved out of loop below.
    go wsConn.listen()
    for {
        select {
        case msg := <-wsConn.writeChan:
            err := connection.WriteJSON(msg.message)
            if err != nil {
                log.Println("connection.WriteJSON error: ", err)
                return
            }
        case <-wsConn.quitChan:
            return
        }
    }
}

In the listen function, loop until error reading the connection. Read on the connection returns immediately with an error when updateClient closes the connection.

To prevent listen from blocking forever in the case where updateClient returns first, close the quit channel instead of sending a value.

func (conn *WsConnection) listen() {
    defer func() {
        conn.connection.Close()
        close(conn.quitChan) // <-- close instead of sending value
    }()

    // keep the loop for communication with client
    for  {
        _, message, err := conn.connection.ReadMessage()
        if err != nil {
            log.Println("read:", err)
            break

        }
        // convert message to type messageWithCb
        switch clientMessage.MessageType {
        case userNotFound:
            conn.writeChan <- messageWithCb{
                message: map[string]interface{}{
                    "type":    user,
                    "payload": false,
                },
            }
        default:
            log.Printf("Unknown message type received: %v", clientMessage)
        }
    }
    log.Println("end of listen")
}

The field isJobFinished is not needed.

One problem with the code in the question and in this answer is that close of writeChan is not coordinated with sends to the channel. I cannot comment on a solution to this problem without seeing the processExternalData function.

It may make sense to use a mutex instead of a goroutine to limit write concurrency. Again, the code in the processExternalData function is required to comment further on this topic.

Upvotes: 1

Related Questions