Dany
Dany

Reputation: 2802

How to listen to a client continiously using gob in Golang

In my use case I would like to continuously listen to a TCP connection and receive the value. The expected value is an object. So I am using gob decoder to receive the value from the connection. I would like to continuously listen to the connection and receive the object using go routines. I have the code snippet here[It is part of the application. code snippet does not compile]. It is getting value for the first time but not receiving for the subsequent objects.

func main() {

    //...
    // SOME CODE
    //...


    // All hosts who are connected; a map wherein
    // the keys are ip addreses and the values are
    //net.Conn objects
    allClients := make(map[string]net.Conn)

    tMaps := make(chan map[string]int64)

    for {
            select {
            // Accept new clients
            //
            case conn := <-newConnections:
            log.Printf("Accepted new client, #%s", hostIp)

            // Constantly read incoming messages from this
            // client in a goroutine and push those onto
            // the tMaps channel for broadcast to others.
            //
            go func(conn net.Conn) {
                    dec := gob.NewDecoder(conn)
                    for {
                            var tMap map[string]int64
                            err := dec.Decode(&tMap)
                            if err != nil {
                                    fmt.Println("Error in decoding ", err)
                                    break
                            }
                            log.Printf("Received values: %+v", tMap)
                            //update throttle map based on the received value
                            tMaps <- throttleMap
                    }

            }(conn)
    }
}

Could anyone help me on this?

Upvotes: 2

Views: 4015

Answers (1)

Todd McLeod
Todd McLeod

Reputation: 202

Let's look at the basics of a TCP server in Go.

First, there is the "listening" part. We can set that up like this:

package main

import (
    "fmt"
    "io"
    "net"
    "time"
)

func main() {
    ln, err := net.Listen("tcp", ":9000")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        io.WriteString(conn, fmt.Sprint("Hello World\n", time.Now(), "\n"))

        conn.Close()
    }
}

Notice the infinite for loop. It is always running and looping over that code. What does the code that is being looped over do? If a connection comes in on the port which is being listened on, then that connection is accepted. We then do something with that connection. In this case, we write back to it with io.WriteString. To this one connection, we are sending a response. We then close the connection. And if there are more connections, we're ready to accept them.

Now let's create a client to connect to the TCP server. This is known as "dialing" in to the TCP server.

To run all of this code on your machine, run the TCP server code above. To run the code, go to your terminal and enter: go run main.go

Now put the code directly below into another file. Launch another tab in your terminal. Run that code also by entering: go run main.go

The code below which "dials" in to your TCP server will connect to the server and the TCP server will respond, then close the connection.

Here is the code for dialing into a TCP server as a client:

package main

import (
    "fmt"
    "io/ioutil"
    "net"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:9000")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    bs, _ := ioutil.ReadAll(conn)
    fmt.Println(string(bs))

}

We can take these basics and start having fun.

Let's create an "echo" server.

This will illustrate accepting many connections.

package main

import (
    "io"
    "net"
)

func main() {
    ln, err := net.Listen("tcp", ":9000")
    if err != nil {
        panic(err)
    }
    defer ln.Close()

    for {
        conn, err := ln.Accept()
        if err != nil {
            panic(err)
        }

        // handles unlimited connections
        go func() {
            io.Copy(conn, conn)
            conn.Close()
        }()
    }
}

Run the file above the same way as before: go run main.go

If you get an error, make sure you have closed the TCP server we were running from the previous example. You close the TCP server with ctrl+c in the terminal.

Now that your new TCP server is running, let's connect to it using Telnet.

On windows you can install telnet; on Mac, it should already be there. Use this command to run telnet and connect to your TCP server: telnet localhost 9000

Now for one more example - an in-memory database like Redis:

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "net"
    "strings"
)

var data = make(map[string]string)

func handle(conn net.Conn) {
    defer conn.Close()

    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        ln := scanner.Text()
        fs := strings.Fields(ln)

        if len(fs) < 2 {
            io.WriteString(conn, "This is an in-memory database \n" +
            "Use SET, GET, DEL like this: \n" +
            "SET key value \n" +
            "GET key \n" +
            "DEL key \n\n" +
            "For example - try these commands: \n" +
            "SET fav chocolate \n" +
            "GET fav \n\n\n")
            continue
        }

        switch fs[0] {
        case "GET":
            key := fs[1]
            value := data[key]
            fmt.Fprintf(conn, "%s\n", value)
        case "SET":
            if len(fs) != 3 {
                io.WriteString(conn, "EXPECTED VALUE\n")
                continue
            }
            key := fs[1]
            value := fs[2]
            data[key] = value
        case "DEL":
            key := fs[1]
            delete(data, key)
        default:
            io.WriteString(conn, "INVALID COMMAND "+fs[0]+"\n")
        }
    }
}

func main() {
    li, err := net.Listen("tcp", ":9000")
    if err != nil {
        log.Fatalln(err)
    }
    defer li.Close()

    for {
        conn, err := li.Accept()
        if err != nil {
            log.Fatalln(err)
        }
        handle(conn)
    }
}

And adding in concurrency:

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "net"
    "strings"
)

type Command struct {
    Fields []string
    Result chan string
}

func redisServer(commands chan Command) {
    var data = make(map[string]string)
    for cmd := range commands {
        if len(cmd.Fields) < 2 {
            cmd.Result <- "Expected at least 2 arguments"
            continue
        }

        fmt.Println("GOT COMMAND", cmd)

        switch cmd.Fields[0] {
        // GET <KEY>
        case "GET":
            key := cmd.Fields[1]
            value := data[key]

            cmd.Result <- value

        // SET <KEY> <VALUE>
        case "SET":
            if len(cmd.Fields) != 3 {
                cmd.Result <- "EXPECTED VALUE"
                continue
            }
            key := cmd.Fields[1]
            value := cmd.Fields[2]
            data[key] = value
            cmd.Result <- ""
        // DEL <KEY>
        case "DEL":
            key := cmd.Fields[1]
            delete(data, key)
            cmd.Result <- ""
        default:
            cmd.Result <- "INVALID COMMAND " + cmd.Fields[0] + "\n"
        }
    }
}

func handle(commands chan Command, conn net.Conn) {
    defer conn.Close()

    scanner := bufio.NewScanner(conn)
    for scanner.Scan() {
        ln := scanner.Text()
        fs := strings.Fields(ln)

        result := make(chan string)
        commands <- Command{
            Fields: fs,
            Result: result,
        }

        io.WriteString(conn, <-result+"\n")
    }

}

func main() {
    li, err := net.Listen("tcp", ":9000")
    if err != nil {
        log.Fatalln(err)
    }
    defer li.Close()

    commands := make(chan Command)
    go redisServer(commands)

    for {
        conn, err := li.Accept()
        if err != nil {
            log.Fatalln(err)
        }

        go handle(commands, conn)
    }
}

See my lectures from my CSUF class describing all of this here. And one more great resource.

Upvotes: 4

Related Questions