The user with no hat
The user with no hat

Reputation: 10846

How to reuse listener/connection ? Golang

I'm trying to connect a computer behind NAT with the internet through a 3rd party server(aka reverse connection). I'm listening on two ports. On one port (dstNet) is connecting the machine behind NAT and on the other port are connecting the internet clients. The issue is that I don't know how to handle the disconnection of the machine behind NAT. Even if the machine is connecting again the the traffic is not handled sent/written anymore... I get [DEBUG] socks: Copied 0 bytes to client which is my warning of course. Below is the code. It's quite long but I can't find what to trim.

// Make a bridge between dstNet which is
// usually behind NAT and srcNet which is usually a client
// which wants to route the traffic though the NAT machine.
package main

import (
    "bufio"
    "errors"
    log "github.com/golang/glog"
    "io"
    "net"
    "time"
)

const (
    // listen on the dstNet so that we can
    // create a connection with the NAT client
    dstNet = "0.0.0.0:9000"
    // listen on srcNet so that we can get traffic
    // to forward to dstNet
    srcNet = "0.0.0.0:9001"
)

var errCh = make(chan error, 1)

// make a channel to send the reverse connections
var lrCh = make(chan net.Conn, 1)
func listenDst() {
    // Listen on the dstNet
    lr, err := net.Listen("tcp", dstNet)
    if err != nil {
        log.Error(err)
        errCh <- err
        return
    }
    // accept the connection
    for {
        lrConn, err := lr.Accept()
        if err != nil {
            log.Error(err)
            errCh <- err
            return
        }
            log.Errorf("sent connection")
        //  lrConn.SetReadDeadline(time.Now().Add(10 * time.Second))
            lrCh <- lrConn

    }

}

func main() {

    go func() {
        for err := range errCh {
            if err != nil {
                panic(err)
            }
        }
    }()
    // listen for the nat server
    go listenDst()

    // listen for clients to connect
    l, err := net.Listen("tcp", srcNet)
    if err != nil {
        log.Error(err)
        panic(err)
    }
    // accept the connection
    for {
        conn, err := l.Accept()
        if err != nil {
            log.Error(err)
            panic(err)
        }
        // serve the connection
        go func(conn net.Conn) {
            defer conn.Close()
            bufConn := bufio.NewReader(conn)
            dst := <-lrCh
            defer dst.Close()

            // Start proxying
            errCh2 := make(chan error, 2)
            go proxy("target", dst, bufConn, errCh2)
            go proxy("client", conn, dst, errCh2)

            // Wait
            var ei int
            for err = range errCh2 {
                switch {
                case err != nil && err.Error() == "no byte":
                    log.Error(err)
                case err != nil && err.Error() == "use of closed network connection":
                    // if the connection is closed we restart it.
                    log.Error(err)
                    // BUG() attempt to write again the bytes
                case err != nil:
                    log.Error(err)
                    errCh <- err
                }
                if ei == 1 {
                    log.Errorf("done with errors")
                    close(errCh2)
                }
                ei++

            }
        }(conn)

    }
}

// proxy is used to suffle data from src to destination, and sends errors
// down a dedicated channel
func proxy(name string, dst io.Writer, src io.Reader, errCh2 chan error) {
    n, err := io.Copy(dst, src)
    // Log, and sleep. This is jank but allows the otherside
    // to finish a pending copy
    log.Errorf("[DEBUG] socks: Copied %d bytes to %s", n, name)
    time.Sleep(10 * time.Millisecond)
    // Send any errors
    switch {
    case err != nil:
        log.Error(err)
        errCh2 <- err
    case n < 1:
        errCh2 <- errors.New("no byte")
    default:
        errCh2 <- nil
    }
    return
}

Upvotes: 2

Views: 5893

Answers (2)

The user with no hat
The user with no hat

Reputation: 10846

It turned out that I had to restart the listener not only to close the connection. I've modified the broker function to reset the destNet listener if it can't write (i.e. writes 0 bytes) to src. I'm still not sure if this is the right way to do it (i.e. closing the listener seems bad in a multi-connections scenario as I guess I reset all the client connections dialing on that address) but so far this is the best I could do to fix it.

 if n == 0 {
        lrNewCh <- 1
    }

Here is all the code. All the credit goes to @JimB

// Make a bridge between dstNet which is
// usually behind NAT and srcNet which is usually a client
// which wants to route the traffic though the NAT machine.
package main

import (
    log "github.com/golang/glog"
    "io"
    "net"
)

// listen on the dstNet so that we can
// create a connection with the NAT client
var dstNet *net.TCPAddr = &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 9000}

// listen on srcNet so that we can get traffic
// to forward to dstNet
var srcNet *net.TCPAddr = &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 9001}

var errCh = make(chan error, 1)

// make a channel to send the reverse connections
var lrCh = make(chan *net.TCPConn, 1)
var lrNewCh = make(chan int, 1)

func listenDst() {
    // Listen on the dstNet
    lr, err := net.ListenTCP("tcp", dstNet)
    if err != nil {
        log.Error(err)
        errCh <- err
        return
    }
    // accept the connection
    for {
        lrConn, err := lr.AcceptTCP()
        if err != nil {
                log.Error(err)
                //errCh <- err
                //return
        }
        status := <-lrNewCh
            log.Errorf("status request is %v", status)
            if status == 1{
                log.Errorf("we close and restart the listener and the connection")
                if err =  lrConn.Close(); err !=nil{
                    log.Error(err)
                }
                if err =  lr.Close(); err !=nil{
                    log.Error(err)
                }
                    lr, err = net.ListenTCP("tcp", dstNet)
                    if err != nil {
                        log.Error(err)
                        errCh <- err
                        return
                    }
                lrConn, err = lr.AcceptTCP()
                if err !=nil{
                    log.Error(err)
                    errCh <- err
                }
            }else{
                log.Errorf("new connection on its way")
                lrCh <- lrConn
            }
    //  default:
            // log.Errorf("accepting new connections")


    }

}

func main() {

    go func() {
        for err := range errCh {
            if err != nil {
                panic(err)
            }
        }
    }()
    // listen for the nat server
    go listenDst()

    // listen for clients to connect
    l, err := net.ListenTCP("tcp", srcNet)
    if err != nil {
        log.Error(err)
        panic(err)
    }
    // accept the connection
    for {
        conn, err := l.AcceptTCP()
        if err != nil {
            log.Error(err)
            panic(err)
        }
        // serve the connection
        go func(conn *net.TCPConn) {
            defer conn.Close()
            lrNewCh <- 0
            dst := <-lrCh
            defer dst.Close()
            proxy(dst, conn)
        }(conn)

    }
}

func proxy(srvConn, cliConn *net.TCPConn) {
    // channels to wait on the close event for each connection
    serverClosed := make(chan struct{}, 1)
    clientClosed := make(chan struct{}, 1)

    go broker(srvConn, cliConn, clientClosed)
    go broker(cliConn, srvConn, serverClosed)

    // wait for one half of the proxy to exit, then trigger a shutdown of the
    // other half by calling CloseRead(). This will break the read loop in the
    // broker and allow us to fully close the connection cleanly without a
    // "use of closed network connection" error.
    var waitFor chan struct{}
    select {
    case <-clientClosed:
        // the client closed first and any more packets from the server aren't
        // useful, so we can optionally SetLinger(0) here to recycle the port
        // faster.
        srvConn.SetLinger(0)
        srvConn.CloseRead()
        waitFor = serverClosed
    case <-serverClosed:
        cliConn.CloseRead()
        waitFor = clientClosed
    }

    // Wait for the other connection to close.
    // This "waitFor" pattern isn't required, but gives us a way to track the
    // connection and ensure all copies terminate correctly; we can trigger
    // stats on entry and deferred exit of this function.
    <-waitFor
}

// This does the actual data transfer.
// The broker only closes the Read side.
func broker(dst, src net.Conn, srcClosed chan struct{}) {
    // We can handle errors in a finer-grained manner by inlining io.Copy (it's
    // simple, and we drop the ReaderFrom or WriterTo checks for
    // net.Conn->net.Conn transfers, which aren't needed). This would also let
    // us adjust buffersize.
    n, err := io.Copy(dst, src)
    log.Errorf(" %v bytes copied", n)
    if err != nil {
        log.Errorf("Copy error: %s", err)
        // errCh <- err
    }
    if err := src.Close(); err != nil {
        log.Errorf("Close error: %s", err)
        errCh <- err
    }
    if n == 0 {
        lrNewCh <- 1
    }
    srcClosed <- struct{}{}

}

Upvotes: 0

Mr_Pink
Mr_Pink

Reputation: 109378

The only time you can reuse a connection after an error is if is a temporary condition.

if err, ok := err.(net.Error); ok && err.Temporary() {
}

If you are trying to proxy a TCPconnection, and there is any other error (checking for Temporary may not even be that useful), you need to drop the whole thing and start over. You have no idea what the state of the remote server is, how many packets are in flight or lost, and it's only going to cause more difficult bugs the harder you try. (tip: don't hide concurrency or timing problems with a sleep. It's just making it harder in the long run)

Here is a much simpler proxy pattern for go if you want to reference it: https://gist.github.com/jbardin/821d08cb64c01c84b81a

func Proxy(srvConn, cliConn *net.TCPConn) {
    // channels to wait on the close event for each connection
    serverClosed := make(chan struct{}, 1)
    clientClosed := make(chan struct{}, 1)

    go broker(srvConn, cliConn, clientClosed)
    go broker(cliConn, srvConn, serverClosed)

    // wait for one half of the proxy to exit, then trigger a shutdown of the
    // other half by calling CloseRead(). This will break the read loop in the
    // broker and allow us to fully close the connection cleanly without a
    // "use of closed network connection" error.
    var waitFor chan struct{}
    select {
    case <-clientClosed:
        // the client closed first and any more packets from the server aren't
        // useful, so we can optionally SetLinger(0) here to recycle the port
        // faster.
        srvConn.SetLinger(0)
        srvConn.CloseRead()
        waitFor = serverClosed
    case <-serverClosed:
        cliConn.CloseRead()
        waitFor = clientClosed
    }

    // Wait for the other connection to close.
    // This "waitFor" pattern isn't required, but gives us a way to track the
    // connection and ensure all copies terminate correctly; we can trigger
    // stats on entry and deferred exit of this function.
    <-waitFor
}

// This does the actual data transfer.
// The broker only closes the Read side.
func broker(dst, src net.Conn, srcClosed chan struct{}) {
    // We can handle errors in a finer-grained manner by inlining io.Copy (it's
    // simple, and we drop the ReaderFrom or WriterTo checks for
    // net.Conn->net.Conn transfers, which aren't needed). This would also let
    // us adjust buffersize.
    _, err := io.Copy(dst, src)

    if err != nil {
        log.Printf("Copy error: %s", err)
    }
    if err := src.Close(); err != nil {
        log.Printf("Close error: %s", err)
    }
    srcClosed <- struct{}{}
}

Upvotes: 3

Related Questions