KiYugadgeter
KiYugadgeter

Reputation: 4012

Why this websocket server does not broadcast to connected client?

Below code is a websocket broadcast server that read from a specific connection then broadcasting it to connected clients.

But this server does not broadcast despite there is no error and warnings. Why this server does not broadcast?

In this code self.KabucomConn is origin socket so read from this socket, then broadcast to client which stored in Hub.RClients.

When new connection established, passing connection object with register channel to Hub, then Hub adds a client to RClient that stored client object.

package main

import (
    "log"
    "net/http"
    "net/url"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    CheckOrigin: func(r *http.Request) bool {
        return true
    },
}

type Client struct {
    hub  *Hub
    Conn *websocket.Conn
    Send chan []byte
}

func (self *Client) writepump() {
    for {
        select {
        case message := <-self.Send:
            w, err := self.Conn.NextWriter(websocket.TextMessage)
            if err != nil {
                log.Print("writepump: nextwriter error")
            }
            w.Write(message)
            w.Close()
        }
    }
}

type Hub struct {
    RClients    map[*Client]bool
    KabucomConn *websocket.Conn
    register    chan *Client
    unregister  chan *Client
    kabucomchan chan []byte
    url         url.URL
}

func (self *Hub) KabucomRun() {
    for {
        _, b, err := self.KabucomConn.ReadMessage() // read data from origin data connection
        log.Println("read message")
        if err != nil {
            log.Println(err)
            self.KabucomConn.Close()
            for i := 1; i <= 5; i++ { //retry re-connect up to 5 times
                self.KabucomConn, _, err = websocket.DefaultDialer.Dial(self.url.String(), nil)
                if i >= 5 && err != nil {
                    log.Fatal(err)
                } else if err != nil {
                    log.Println(err, "try", i)
                    continue
                } else {
                    break
                }
            }
            log.Println("conti")
            continue
        }
        log.Println(b)
        self.kabucomchan <- b
    }
}

func (self *Hub) Run() {
    defer func() {
        for c, _ := range self.RClients {
            close(c.Send)
            c.Conn.Close()
        }
    }()
    for {
        select {
        case message := <-self.kabucomchan:
            log.Println("kabucomchan")
            log.Println(message)
            for c, _ := range self.RClients {
                c.Send <- message
            }
        case c := <-self.register:
            log.Println("reg")
            self.RClients[c] = true

        case c := <-self.unregister:
            log.Println("unreg")
            delete(self.RClients, c)
            close(c.Send)
        }
    }
}

func newHub() *Hub {
    u := url.URL{Scheme: "ws", Host: "192.168.1.8:20063", Path: "/ws"}
    var conn *websocket.Conn
    for i := 1; i <= 5; i++ {
        d, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil && i < 5 {
            log.Println(err)
            continue
        } else if i >= 5 {
            log.Println("Hub: Kabucom connection error")
        }
        conn = d
        break
    }
    return &Hub{RClients: make(map[*Client]bool), register: make(chan *Client), KabucomConn: conn}
}

func handler(w http.ResponseWriter, r *http.Request, hub *Hub) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    client := &Client{Conn: conn, hub: hub, Send: make(chan []byte, 256)}
    go client.writepump()
    hub.register <- client
}

func main() {
    hub := newHub()
    go hub.Run()
    go hub.KabucomRun()
    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        handler(w, r, hub)
    })
    log.Println(":20021/ws")
    http.ListenAndServe(":20021", nil)
}
  

Upvotes: 1

Views: 331

Answers (1)

blackgreen
blackgreen

Reputation: 45041

Because you are not initializing Hub.kabucomchan.

func newHub() *Hub {
    //...
    return &Hub{RClients: make(map[*Client]bool), register: make(chan *Client), KabucomConn: conn, /* kabucomchan ??? */}
}

Send and receive operations on channels assume that both sender c<- and receiver <-c hold a reference to the same channel, but when the channel is nil this reference doesn't exist, and the send and receive just block forever.

Properly initialize the channel in the Hub constructor:

return &Hub{
   RClients: make(map[*Client]bool), 
   register: make(chan *Client), 
   KabucomConn: conn,
   kabucomchan: make(chan []byte, /* buffered? */), // <--- !
}

Upvotes: 1

Related Questions