Commit 95da05a8 authored by Joram Wilander's avatar Joram Wilander Committed by GitHub
Browse files

PLT-5750 Add sequence number to websocket connections and events (#5907)

* Add sequence number to websocket connections and events

* Copy pointer instead of pass by value and use int64 over uint64

* Add more logging to missed events
parent d39947f5
...@@ -35,6 +35,7 @@ type WebConn struct { ...@@ -35,6 +35,7 @@ type WebConn struct {
Locale string Locale string
AllChannelMembers map[string]string AllChannelMembers map[string]string
LastAllChannelMembersTime int64 LastAllChannelMembersTime int64
Sequence int64
} }
func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn { func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn {
...@@ -104,8 +105,19 @@ func (c *WebConn) WritePump() { ...@@ -104,8 +105,19 @@ func (c *WebConn) WritePump() {
return return
} }
var msgBytes []byte
if evt, ok := msg.(*model.WebSocketEvent); ok {
cpyEvt := &model.WebSocketEvent{}
*cpyEvt = *evt
cpyEvt.Sequence = c.Sequence
msgBytes = []byte(cpyEvt.ToJson())
c.Sequence++
} else {
msgBytes = []byte(msg.ToJson())
}
c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT)) c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
if err := c.WebSocket.WriteMessage(websocket.TextMessage, msg.GetPreComputeJson()); err != nil { if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
// browsers will appear as CloseNoStatusReceived // browsers will appear as CloseNoStatusReceived
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId)) l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
...@@ -179,7 +191,6 @@ func (webCon *WebConn) IsAuthenticated() bool { ...@@ -179,7 +191,6 @@ func (webCon *WebConn) IsAuthenticated() bool {
func (webCon *WebConn) SendHello() { func (webCon *WebConn) SendHello() {
msg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_HELLO, "", "", webCon.UserId, nil) msg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_HELLO, "", "", webCon.UserId, nil)
msg.Add("server_version", fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.CfgHash, utils.IsLicensed)) msg.Add("server_version", fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.CfgHash, utils.IsLicensed))
msg.DoPreComputeJson()
webCon.Send <- msg webCon.Send <- msg
} }
......
...@@ -94,7 +94,6 @@ func Publish(message *model.WebSocketEvent) { ...@@ -94,7 +94,6 @@ func Publish(message *model.WebSocketEvent) {
metrics.IncrementWebsocketEvent(message.Event) metrics.IncrementWebsocketEvent(message.Event)
} }
message.DoPreComputeJson()
for _, hub := range hubs { for _, hub := range hubs {
hub.Broadcast(message) hub.Broadcast(message)
} }
...@@ -105,7 +104,6 @@ func Publish(message *model.WebSocketEvent) { ...@@ -105,7 +104,6 @@ func Publish(message *model.WebSocketEvent) {
} }
func PublishSkipClusterSend(message *model.WebSocketEvent) { func PublishSkipClusterSend(message *model.WebSocketEvent) {
message.DoPreComputeJson()
for _, hub := range hubs { for _, hub := range hubs {
hub.Broadcast(message) hub.Broadcast(message)
} }
......
...@@ -61,7 +61,6 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque ...@@ -61,7 +61,6 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
HubRegister(conn) HubRegister(conn)
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil) resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil)
resp.DoPreComputeJson()
conn.Send <- resp conn.Send <- resp
} }
...@@ -91,7 +90,6 @@ func ReturnWebSocketError(conn *WebConn, r *model.WebSocketRequest, err *model.A ...@@ -91,7 +90,6 @@ func ReturnWebSocketError(conn *WebConn, r *model.WebSocketRequest, err *model.A
err.DetailedError = "" err.DetailedError = ""
errorResp := model.NewWebSocketError(r.Seq, err) errorResp := model.NewWebSocketError(r.Seq, err)
errorResp.DoPreComputeJson()
conn.Send <- errorResp conn.Send <- errorResp
} }
...@@ -36,8 +36,6 @@ const ( ...@@ -36,8 +36,6 @@ const (
type WebSocketMessage interface { type WebSocketMessage interface {
ToJson() string ToJson() string
IsValid() bool IsValid() bool
DoPreComputeJson()
GetPreComputeJson() []byte
EventType() string EventType() string
} }
...@@ -52,7 +50,7 @@ type WebSocketEvent struct { ...@@ -52,7 +50,7 @@ type WebSocketEvent struct {
Event string `json:"event"` Event string `json:"event"`
Data map[string]interface{} `json:"data"` Data map[string]interface{} `json:"data"`
Broadcast *WebsocketBroadcast `json:"broadcast"` Broadcast *WebsocketBroadcast `json:"broadcast"`
PreComputeJson []byte `json:"-"` Sequence int64 `json:"seq"`
} }
func (m *WebSocketEvent) Add(key string, value interface{}) { func (m *WebSocketEvent) Add(key string, value interface{}) {
...@@ -72,19 +70,6 @@ func (o *WebSocketEvent) EventType() string { ...@@ -72,19 +70,6 @@ func (o *WebSocketEvent) EventType() string {
return o.Event return o.Event
} }
func (o *WebSocketEvent) DoPreComputeJson() {
b, err := json.Marshal(o)
if err != nil {
o.PreComputeJson = []byte("")
} else {
o.PreComputeJson = b
}
}
func (o *WebSocketEvent) GetPreComputeJson() []byte {
return o.PreComputeJson
}
func (o *WebSocketEvent) ToJson() string { func (o *WebSocketEvent) ToJson() string {
b, err := json.Marshal(o) b, err := json.Marshal(o)
if err != nil { if err != nil {
...@@ -110,7 +95,6 @@ type WebSocketResponse struct { ...@@ -110,7 +95,6 @@ type WebSocketResponse struct {
SeqReply int64 `json:"seq_reply,omitempty"` SeqReply int64 `json:"seq_reply,omitempty"`
Data map[string]interface{} `json:"data,omitempty"` Data map[string]interface{} `json:"data,omitempty"`
Error *AppError `json:"error,omitempty"` Error *AppError `json:"error,omitempty"`
PreComputeJson []byte `json:"-"`
} }
func (m *WebSocketResponse) Add(key string, value interface{}) { func (m *WebSocketResponse) Add(key string, value interface{}) {
...@@ -142,19 +126,6 @@ func (o *WebSocketResponse) ToJson() string { ...@@ -142,19 +126,6 @@ func (o *WebSocketResponse) ToJson() string {
} }
} }
func (o *WebSocketResponse) DoPreComputeJson() {
b, err := json.Marshal(o)
if err != nil {
o.PreComputeJson = []byte("")
} else {
o.PreComputeJson = b
}
}
func (o *WebSocketResponse) GetPreComputeJson() []byte {
return o.PreComputeJson
}
func WebSocketResponseFromJson(data io.Reader) *WebSocketResponse { func WebSocketResponseFromJson(data io.Reader) *WebSocketResponse {
decoder := json.NewDecoder(data) decoder := json.NewDecoder(data)
var o WebSocketResponse var o WebSocketResponse
......
...@@ -61,6 +61,13 @@ export function initialize() { ...@@ -61,6 +61,13 @@ export function initialize() {
WebSocketClient.setEventCallback(handleEvent); WebSocketClient.setEventCallback(handleEvent);
WebSocketClient.setFirstConnectCallback(handleFirstConnect); WebSocketClient.setFirstConnectCallback(handleFirstConnect);
WebSocketClient.setReconnectCallback(() => reconnect(false));
WebSocketClient.setMissedEventCallback(() => {
if (global.window.mm_config.EnableDeveloper === 'true') {
Client.logClientError('missed websocket event seq=' + WebSocketClient.eventSequence);
}
reconnect(false);
});
WebSocketClient.setCloseCallback(handleClose); WebSocketClient.setCloseCallback(handleClose);
WebSocketClient.initialize(connUrl); WebSocketClient.initialize(connUrl);
} }
......
...@@ -10,11 +10,13 @@ export default class WebSocketClient { ...@@ -10,11 +10,13 @@ export default class WebSocketClient {
this.conn = null; this.conn = null;
this.connectionUrl = null; this.connectionUrl = null;
this.sequence = 1; this.sequence = 1;
this.eventSequence = 0;
this.connectFailCount = 0; this.connectFailCount = 0;
this.eventCallback = null; this.eventCallback = null;
this.responseCallbacks = {}; this.responseCallbacks = {};
this.firstConnectCallback = null; this.firstConnectCallback = null;
this.reconnectCallback = null; this.reconnectCallback = null;
this.missedEventCallback = null;
this.errorCallback = null; this.errorCallback = null;
this.closeCallback = null; this.closeCallback = null;
} }
...@@ -37,6 +39,8 @@ export default class WebSocketClient { ...@@ -37,6 +39,8 @@ export default class WebSocketClient {
this.connectionUrl = connectionUrl; this.connectionUrl = connectionUrl;
this.conn.onopen = () => { this.conn.onopen = () => {
this.eventSequence = 0;
if (token) { if (token) {
this.sendMessage('authentication_challenge', {token}); this.sendMessage('authentication_challenge', {token});
} }
...@@ -108,6 +112,11 @@ export default class WebSocketClient { ...@@ -108,6 +112,11 @@ export default class WebSocketClient {
Reflect.deleteProperty(this.responseCallbacks, msg.seq_reply); Reflect.deleteProperty(this.responseCallbacks, msg.seq_reply);
} }
} else if (this.eventCallback) { } else if (this.eventCallback) {
if (msg.seq !== this.eventSequence && this.missedEventCallback) {
console.log('missed websocket event, act_seq=' + msg.seq + ' exp_seq=' + this.eventSequence); //eslint-disable-line no-console
this.missedEventCallback();
}
this.eventSequence = msg.seq + 1;
this.eventCallback(msg); this.eventCallback(msg);
} }
}; };
...@@ -125,6 +134,10 @@ export default class WebSocketClient { ...@@ -125,6 +134,10 @@ export default class WebSocketClient {
this.reconnectCallback = callback; this.reconnectCallback = callback;
} }
setMissedEventCallback(callback) {
this.missedEventCallback = callback;
}
setErrorCallback(callback) { setErrorCallback(callback) {
this.errorCallback = callback; this.errorCallback = callback;
} }
......
...@@ -27,7 +27,6 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR ...@@ -27,7 +27,6 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error()) l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error())
sessionErr.DetailedError = "" sessionErr.DetailedError = ""
errResp := model.NewWebSocketError(r.Seq, sessionErr) errResp := model.NewWebSocketError(r.Seq, sessionErr)
errResp.DoPreComputeJson()
conn.Send <- errResp conn.Send <- errResp
return return
...@@ -44,14 +43,12 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR ...@@ -44,14 +43,12 @@ func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketR
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError) l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError)
err.DetailedError = "" err.DetailedError = ""
errResp := model.NewWebSocketError(r.Seq, err) errResp := model.NewWebSocketError(r.Seq, err)
errResp.DoPreComputeJson()
conn.Send <- errResp conn.Send <- errResp
return return
} }
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, data) resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, data)
resp.DoPreComputeJson()
conn.Send <- resp conn.Send <- resp
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment