L30
L30

Reputation: 13

Sending to channel doesn't happen if select has default

I am working on a personal project that will run on a Raspberry Pi with some sensors attached to it.

The function that read from the sensors and the function that handle the socket connection are executed in different goroutines, so, in order to send data on the socket when they are read from the sensors, I create a chan []byte in the main function and pass it to the goroutines.

My problem came out here: if I do multiple writes in a row, only the first data arrives to the client, but the others don't. But if I put a little time.Sleep in the sender function, all the data arrives correctly to the client.

Anyway, that's a simplified version of this little program :

package main

import (
    "net"
    "os"
    "sync"
    "time"
)

const socketName string = "./test_socket"

// create to the socket and launch the accept client routine
func launchServerUDS(ch chan []byte) {
    if err := os.RemoveAll(socketName); err != nil {
        return
    }
    l, err := net.Listen("unix", socketName)
    if err != nil {
        return
    }
    go acceptConnectionRoutine(l, ch)
}

// accept incoming connection on the socket and
// 1) launch the routine to handle commands from the client
// 2) launch the routine to send data when the server reads from the sensors
func acceptConnectionRoutine(l net.Listener, ch chan []byte) {
    defer l.Close()
    for {
        conn, err := l.Accept()
        if err != nil {
            return
        }
        go commandsHandlerRoutine(conn, ch)
        go autoSendRoutine(conn, ch)

    }
}

// routine that sends data to the client
func autoSendRoutine(c net.Conn, ch chan []byte) {
    for {
        data := <-ch
        if string(data) == "exit" {
            return
        }
        c.Write(data)
    }
}

// handle client connection and calls functions to execute commands
func commandsHandlerRoutine(c net.Conn, ch chan []byte) {
    for {
        buf := make([]byte, 1024)
        n, err := c.Read(buf)
        if err != nil {
            ch <- []byte("exit")
            break
        }
        // now, for sake of simplicity , only echo commands back to the client
        _, err = c.Write(buf[:n])
        if err != nil {
            ch <- []byte("exit")
            break
        }
    }
}

// write on the channel to the autosend routine so the data are written on the socket
func sendDataToClient(data []byte, ch chan []byte) {
    select {
    case ch <- data:
        // if i put a little sleep here, no problems
        // i i remove the sleep, only data1 is sent to the client
        // time.Sleep(1 * time.Millisecond)
    default:
    }
}

func dummyReadDataRoutine(ch chan []byte) {
    for {
        // read data from the sensors every 5 seconds
        time.Sleep(5 * time.Second)
        // read first data and send it
        sendDataToClient([]byte("dummy data1\n"), ch)
        // read second data and send it
        sendDataToClient([]byte("dummy data2\n"), ch)
        // read third data and send it
        sendDataToClient([]byte("dummy data3\n"), ch)
    }
}

func main() {
    ch := make(chan []byte)
    wg := sync.WaitGroup{}
    wg.Add(2)
    go dummyReadDataRoutine(ch)
    go launchServerUDS(ch)
    wg.Wait()
}

I don't think it's correct to use a sleep to synchronize writes. How do I fix this while keeping the functions running on a different different goroutines.

Upvotes: 0

Views: 2198

Answers (2)

L30
L30

Reputation: 13

JimB gave a good explanation, so I think his answer is the better one.

I have included my partial solution in this answer.

I was thinking that my code was clear and simplified, but as Jim said I can do it simpler and clearer. I leave my old code posted so people can understand better how you can post simpler code and not do a mess like I did.

As chmike said, my issue wasn't related to the socket like I was thinking, but was only related to the channel. Write on a unbuffered channel was one of the problems. After change the unbuffered channel to a buffered one, the issue was resolved. Anyway, this code is not "good code" and can be improved following the principles that JimB has written in his answer.

So here is the new code:

package main

import (
    "net"
    "os"
    "sync"
    "time"
)

const socketName string = "./test_socket"

// create the socket and accept clients connections
func launchServerUDS(ch chan []byte, wg *sync.WaitGroup) {
    defer wg.Done()
    if err := os.RemoveAll(socketName); err != nil {
        return
    }
    l, err := net.Listen("unix", socketName)
    if err != nil {
        return
    }
    defer l.Close()
    for {
        conn, err := l.Accept()
        if err != nil {
            return
        }
        // this goroutine are launched when a client is connected
        // routine that listen and echo commands
        go commandsHandlerRoutine(conn, ch)
        // routine to send data read from the sensors to the client
        go autoSendRoutine(conn, ch)
    }
}

// routine that sends data to the client
func autoSendRoutine(c net.Conn, ch chan []byte) {
    for {
        data := <-ch
        if string(data) == "exit" {
            return
        }
        c.Write(data)
    }
}

// handle commands received from the client
func commandsHandlerRoutine(c net.Conn, ch chan []byte) {
    for {
        buf := make([]byte, 1024)
        n, err := c.Read(buf)
        if err != nil {
            // if i can't read send an exit command to autoSendRoutine and exit
            ch <- []byte("exit")
            break
        }
        // now, for sake of simplicity , only echo commands back to the client
        _, err = c.Write(buf[:n])
        if err != nil {
            // if i can't write back send an exit command to autoSendRoutine and exit
            ch <- []byte("exit")
            break
        }
    }
}

// this goroutine reads from the sensors and write to the channel , so data are sent
// to the client if a client is connected
func dummyReadDataRoutine(ch chan []byte, wg *sync.WaitGroup) {
    x := 0
    for x < 100 {
        // read data from the sensors every 5 seconds
        time.Sleep(1 * time.Second)
        // read first data and send it
        ch <- []byte("data1\n")
        // read second data and send it
        ch <- []byte("data2\n")
        // read third data and send it
        ch <- []byte("data3\n")
        x++
    }
    wg.Done()
}


func main() {
    // create a BUFFERED CHANNEL
    ch := make(chan []byte, 1)
    wg := sync.WaitGroup{}
    wg.Add(2)
    // launch the goruotines that handle the socket connections
    // and read data from the sensors
    go dummyReadDataRoutine(ch, &wg)
    go launchServerUDS(ch, &wg)
    wg.Wait()
}

Upvotes: 0

Mr_Pink
Mr_Pink

Reputation: 109377

The primary problem was in the function:

func sendDataToClient(data []byte, ch chan []byte) {
    select {
    case ch <- data:
        // if I put a little sleep here, no problems
        // if I remove the sleep, only data1 is sent to the client
        // time.Sleep(1 * time.Millisecond)
    default:
}

If the channel ch isn't ready at the moment the function is called, the default case will be taken and the data will never be sent. In this case you should eliminate the function and send to the channel directly.

Buffering the channel is orthogonal to the problem at hand, and should be done for the similar reasons as you would buffered IO, i.e. provide a "buffer" for writes that can't immediately progress. If the code were not able progress without a buffer, adding one only delays possible deadlocks.

You also don't need the exit sentinel value here, as you could range over the channel and close it when you're done. This however still ignores write errors, but again that requires some re-design.

for data := range ch {
    c.Write(data)
}

You should also be careful passing slices over channels, as it's all too easy to lose track of which logical process has ownership and is going to modify the backing array. I can't say from the information given if passing the read+write data over channels improves the architecture, but this is not a pattern you will find in most go networking code.

Upvotes: 2

Related Questions