Commit e62afeac authored by Corey Hulen's avatar Corey Hulen Committed by Joram Wilander

Adding slow pump detection to web_conn and better metrics (#6114)

* Adding slow pump detection to web_conn and better metrics

* Fixing bad merge

* Fixing typo
parent 81706b40
......@@ -17,6 +17,9 @@ import (
)
const (
SEND_QUEUE_SIZE = 256
SEND_SLOW_WARN = (SEND_QUEUE_SIZE * 50) / 100
SEND_DEADLOCK_WARN = (SEND_QUEUE_SIZE * 95) / 100
WRITE_WAIT = 30 * time.Second
PONG_WAIT = 100 * time.Second
PING_PERIOD = (PONG_WAIT * 6) / 10
......@@ -44,7 +47,7 @@ func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFun
}
return &WebConn{
Send: make(chan model.WebSocketMessage, 256),
Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE),
WebSocket: ws,
UserId: session.UserId,
SessionToken: session.Token,
......@@ -105,35 +108,54 @@ func (c *WebConn) WritePump() {
return
}
var msgBytes []byte
if evt, ok := msg.(*model.WebSocketEvent); ok {
cpyEvt := &model.WebSocketEvent{}
*cpyEvt = *evt
cpyEvt.Sequence = c.Sequence
msgBytes = []byte(cpyEvt.ToJson())
c.Sequence++
} else {
msgBytes = []byte(msg.ToJson())
evt, evtOk := msg.(*model.WebSocketEvent)
skipSend := false
if len(c.Send) >= SEND_SLOW_WARN {
// When the pump starts to get slow we'll drop non-critical messages
if msg.EventType() == model.WEBSOCKET_EVENT_TYPING || msg.EventType() == model.WEBSOCKET_EVENT_STATUS_CHANGE {
l4g.Info(fmt.Sprintf("websocket.slow: dropping message userId=%v type=%v channelId=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId))
skipSend = true
}
}
c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
// browsers will appear as CloseNoStatusReceived
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
if !skipSend {
var msgBytes []byte
if evtOk {
cpyEvt := &model.WebSocketEvent{}
*cpyEvt = *evt
cpyEvt.Sequence = c.Sequence
msgBytes = []byte(cpyEvt.ToJson())
c.Sequence++
} else {
l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error()))
msgBytes = []byte(msg.ToJson())
}
return
}
if len(c.Send) >= SEND_DEADLOCK_WARN {
if evtOk {
l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v channelId=%v size=%v", c.UserId, msg.EventType(), evt.Broadcast.ChannelId, len(msg.ToJson())))
} else {
l4g.Error(fmt.Sprintf("websocket.full: message userId=%v type=%v size=%v", c.UserId, msg.EventType(), len(msg.ToJson())))
}
}
c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
// browsers will appear as CloseNoStatusReceived
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
} else {
l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error()))
}
return
}
if msg.EventType() == model.WEBSOCKET_EVENT_POSTED {
if einterfaces.GetMetricsInterface() != nil {
einterfaces.GetMetricsInterface().IncrementPostBroadcast()
go einterfaces.GetMetricsInterface().IncrementWebSocketBroadcast(msg.EventType())
}
}
}
case <-ticker.C:
c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
if err := c.WebSocket.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
......
......@@ -33,6 +33,7 @@ type MetricsInterface interface {
IncrementMemCacheHitCounterSession()
IncrementWebsocketEvent(eventType string)
IncrementWebSocketBroadcast(eventType string)
AddMemCacheHitCounter(cacheName string, amount float64)
AddMemCacheMissCounter(cacheName string, amount float64)
......
......@@ -34,6 +34,7 @@ const (
WEBSOCKET_AUTHENTICATION_CHALLENGE = "authentication_challenge"
WEBSOCKET_EVENT_REACTION_ADDED = "reaction_added"
WEBSOCKET_EVENT_REACTION_REMOVED = "reaction_removed"
WEBSOCKET_EVENT_RESPONSE = "response"
)
type WebSocketMessage interface {
......@@ -117,7 +118,7 @@ func (o *WebSocketResponse) IsValid() bool {
}
func (o *WebSocketResponse) EventType() string {
return ""
return WEBSOCKET_EVENT_RESPONSE
}
func (o *WebSocketResponse) ToJson() string {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment