rizpng
rizpng

Reputation: 51

TCP connection pool

i'm new to socket and trying to create a connection pooling over tcp socket. my implementation send 32bit length then binary message for each call. But i'm having problem with sometimes the reader receiving previous response from server (could happened when client close and re-establish socket on send error). how do i flush socket (remaining bytes from previous call) before a new request. any suggestion?

Edit: i learned that tcp always stream 0s, what if i send byte(1) before message so i can have a flush function to check if socket not empty before a new call.

Upvotes: 1

Views: 13839

Answers (1)

markc
markc

Reputation: 121

Your post actually asks several questions:

  • How to manage a connection pool?
  • How to handle communication over the sockets?

These are really two different things. A connection pool is just a way to manage a set of connections. A simple way to implement this is with a class such as:

    package netpool

    import (
        "net"
    )

    const MaxConnections = 3

    type Error string

    func (e Error) Error() string {
        return string(e)
    }

     var ErrMaxConn = Error("Maximum connections reached")

    type Netpool struct {
        name  string
        conns int
        free  []net.Conn
    }

    func NewNetpool(name string) *Netpool {
        return &Netpool{
            name: name,
        }
    }

    func (n *Netpool) Open() (conn net.Conn, err error) {
        if n.conns >= MaxConnections && len(n.free) == 0 {
            return nil, ErrMaxConn
        }

        if len(n.free) > 0 {
            // return the first free connection in the pool
            conn = n.free[0]
            n.free = n.free[1:]
        } else {
            addr, err := net.ResolveTCPAddr("tcp", n.name)
            if err != nil {
                return nil, err
            }
            conn, err = net.DialTCP("tcp", nil, addr)
            if err != nil {
                return nil, err
            }
            n.conns += 1
        }
        return conn, err
    }

    func (n *Netpool) Close(conn net.Conn) error {
        n.free = append(n.free, conn)
        return nil
    }

I have created a stand-alone class here. It would typically be implemented as part of a higher-level class such as MyHTTPHost, or MyDatabase.

In this simple implementation, connections that are returned via netpool.Open() are not tracked. It's possible to leak connections by calling Open(), then closing the connections outside of netpool.Close(). It's possible to track them if you want to hold an active and inactive pool, for example, which would solve this problem.

A couple of other things you might want to add to a pooling implementation:

  • Threading protection (using sync.Mutex, for example)
  • Closing of connections in the freepool after some length of inactivity
  • Error checking to be sure that closed connections are still valid

Once you have a connection, you can call Read and Write on it normally. To flush all oustanding data on the socket, you can simply use the ioutil.ReadAll() helper function. By default, this will block indefinitely if there is no data available. To avoid that, add a read timeout using:

    conn.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
    _, err = ioutil.ReadAll(conn)
    neterr, ok := err.(net.Error)
    if ok && neterr.Timeout() {
        err = nil // timeout isn't an error in this case
    }
    if err != nil {
        // handle the error case.
    }

This will read all the data from the given connection if any is pending, or will return after 500ms with an I/O Timeout error if no data was pending.

The type assertion is required because ioutil.ReadAll() returns an Error interface, rather than a net.Error interface, and we need the latter to be able to easily find out if the call returned due to a timeout.

Upvotes: 12

Related Questions