Reputation: 4012
Below code is a websocket broadcast server that read from a specific connection then broadcasting it to connected clients.
But this server does not broadcast despite there is no error and warnings. Why this server does not broadcast?
In this code self.KabucomConn
is origin socket so read from this socket, then broadcast to client which stored in Hub.RClients
.
When new connection established, passing connection object with register channel to Hub
, then Hub
adds a client to RClient
that stored client object.
package main
import (
"log"
"net/http"
"net/url"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Client struct {
hub *Hub
Conn *websocket.Conn
Send chan []byte
}
func (self *Client) writepump() {
for {
select {
case message := <-self.Send:
w, err := self.Conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Print("writepump: nextwriter error")
}
w.Write(message)
w.Close()
}
}
}
type Hub struct {
RClients map[*Client]bool
KabucomConn *websocket.Conn
register chan *Client
unregister chan *Client
kabucomchan chan []byte
url url.URL
}
func (self *Hub) KabucomRun() {
for {
_, b, err := self.KabucomConn.ReadMessage() // read data from origin data connection
log.Println("read message")
if err != nil {
log.Println(err)
self.KabucomConn.Close()
for i := 1; i <= 5; i++ { //retry re-connect up to 5 times
self.KabucomConn, _, err = websocket.DefaultDialer.Dial(self.url.String(), nil)
if i >= 5 && err != nil {
log.Fatal(err)
} else if err != nil {
log.Println(err, "try", i)
continue
} else {
break
}
}
log.Println("conti")
continue
}
log.Println(b)
self.kabucomchan <- b
}
}
func (self *Hub) Run() {
defer func() {
for c, _ := range self.RClients {
close(c.Send)
c.Conn.Close()
}
}()
for {
select {
case message := <-self.kabucomchan:
log.Println("kabucomchan")
log.Println(message)
for c, _ := range self.RClients {
c.Send <- message
}
case c := <-self.register:
log.Println("reg")
self.RClients[c] = true
case c := <-self.unregister:
log.Println("unreg")
delete(self.RClients, c)
close(c.Send)
}
}
}
func newHub() *Hub {
u := url.URL{Scheme: "ws", Host: "192.168.1.8:20063", Path: "/ws"}
var conn *websocket.Conn
for i := 1; i <= 5; i++ {
d, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil && i < 5 {
log.Println(err)
continue
} else if i >= 5 {
log.Println("Hub: Kabucom connection error")
}
conn = d
break
}
return &Hub{RClients: make(map[*Client]bool), register: make(chan *Client), KabucomConn: conn}
}
func handler(w http.ResponseWriter, r *http.Request, hub *Hub) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
client := &Client{Conn: conn, hub: hub, Send: make(chan []byte, 256)}
go client.writepump()
hub.register <- client
}
func main() {
hub := newHub()
go hub.Run()
go hub.KabucomRun()
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
handler(w, r, hub)
})
log.Println(":20021/ws")
http.ListenAndServe(":20021", nil)
}
Upvotes: 1
Views: 331
Reputation: 45041
Because you are not initializing Hub.kabucomchan
.
func newHub() *Hub {
//...
return &Hub{RClients: make(map[*Client]bool), register: make(chan *Client), KabucomConn: conn, /* kabucomchan ??? */}
}
Send and receive operations on channels assume that both sender c<-
and receiver <-c
hold a reference to the same channel, but when the channel is nil
this reference doesn't exist, and the send and receive just block forever.
Properly initialize the channel in the Hub
constructor:
return &Hub{
RClients: make(map[*Client]bool),
register: make(chan *Client),
KabucomConn: conn,
kabucomchan: make(chan []byte, /* buffered? */), // <--- !
}
Upvotes: 1