web_conn.go 7.4 KB
Newer Older
1
// Copyright (c) 2015 Mattermost, Inc. All Rights Reserved.
=Corey Hulen's avatar
=Corey Hulen committed
2 3
// See License.txt for license information.

4
package app
=Corey Hulen's avatar
=Corey Hulen committed
5 6

import (
7
	"fmt"
8 9
	"time"

10
	"github.com/mattermost/platform/einterfaces"
=Corey Hulen's avatar
=Corey Hulen committed
11
	"github.com/mattermost/platform/model"
12
	"github.com/mattermost/platform/utils"
13

14
	l4g "github.com/alecthomas/log4go"
15 16
	"github.com/gorilla/websocket"
	goi18n "github.com/nicksnyder/go-i18n/i18n"
=Corey Hulen's avatar
=Corey Hulen committed
17 18 19
)

const (
20 21 22 23 24
	WRITE_WAIT                = 30 * time.Second
	PONG_WAIT                 = 100 * time.Second
	PING_PERIOD               = (PONG_WAIT * 6) / 10
	AUTH_TIMEOUT              = 5 * time.Second
	WEBCONN_MEMBER_CACHE_TIME = 1000 * 60 * 30 // 30 minutes
=Corey Hulen's avatar
=Corey Hulen committed
25 26 27
)

type WebConn struct {
28 29 30
	WebSocket                 *websocket.Conn
	Send                      chan model.WebSocketMessage
	SessionToken              string
31
	SessionExpiresAt          int64
32
	Session                   *model.Session
33 34 35 36 37
	UserId                    string
	T                         goi18n.TranslateFunc
	Locale                    string
	AllChannelMembers         map[string]string
	LastAllChannelMembersTime int64
38
	Sequence                  int64
=Corey Hulen's avatar
=Corey Hulen committed
39 40
}

41 42 43
func NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn {
	if len(session.UserId) > 0 {
		go SetStatusOnline(session.UserId, session.Id, false)
44
	}
=Corey Hulen's avatar
=Corey Hulen committed
45

46
	return &WebConn{
47 48
		Send:             make(chan model.WebSocketMessage, 256),
		WebSocket:        ws,
49 50 51 52 53
		UserId:           session.UserId,
		SessionToken:     session.Token,
		SessionExpiresAt: session.ExpiresAt,
		T:                t,
		Locale:           locale,
54
	}
=Corey Hulen's avatar
=Corey Hulen committed
55 56
}

57
func (c *WebConn) ReadPump() {
=Corey Hulen's avatar
=Corey Hulen committed
58
	defer func() {
59
		HubUnregister(c)
=Corey Hulen's avatar
=Corey Hulen committed
60 61
		c.WebSocket.Close()
	}()
62
	c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB)
=Corey Hulen's avatar
=Corey Hulen committed
63 64 65
	c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT))
	c.WebSocket.SetPongHandler(func(string) error {
		c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT))
66
		if c.IsAuthenticated() {
67 68
			go SetStatusAwayIfNeeded(c.UserId, false)
		}
=Corey Hulen's avatar
=Corey Hulen committed
69 70 71 72
		return nil
	})

	for {
73 74
		var req model.WebSocketRequest
		if err := c.WebSocket.ReadJSON(&req); err != nil {
75 76 77 78
			// browsers will appear as CloseNoStatusReceived
			if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
				l4g.Debug(fmt.Sprintf("websocket.read: client side closed socket userId=%v", c.UserId))
			} else {
79
				l4g.Debug(fmt.Sprintf("websocket.read: closing websocket for userId=%v error=%v", c.UserId, err.Error()))
80 81
			}

=Corey Hulen's avatar
=Corey Hulen committed
82 83
			return
		} else {
84
			Srv.WebSocketRouter.ServeWebSocket(c, &req)
=Corey Hulen's avatar
=Corey Hulen committed
85 86 87 88
		}
	}
}

89
func (c *WebConn) WritePump() {
=Corey Hulen's avatar
=Corey Hulen committed
90
	ticker := time.NewTicker(PING_PERIOD)
91
	authTicker := time.NewTicker(AUTH_TIMEOUT)
=Corey Hulen's avatar
=Corey Hulen committed
92 93 94

	defer func() {
		ticker.Stop()
95
		authTicker.Stop()
=Corey Hulen's avatar
=Corey Hulen committed
96 97 98 99 100 101 102 103 104 105 106 107
		c.WebSocket.Close()
	}()

	for {
		select {
		case msg, ok := <-c.Send:
			if !ok {
				c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
				c.WebSocket.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}

108 109 110 111 112 113 114 115 116 117 118
			var msgBytes []byte
			if evt, ok := msg.(*model.WebSocketEvent); ok {
				cpyEvt := &model.WebSocketEvent{}
				*cpyEvt = *evt
				cpyEvt.Sequence = c.Sequence
				msgBytes = []byte(cpyEvt.ToJson())
				c.Sequence++
			} else {
				msgBytes = []byte(msg.ToJson())
			}

119
			c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
120
			if err := c.WebSocket.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
121 122 123 124
				// browsers will appear as CloseNoStatusReceived
				if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
					l4g.Debug(fmt.Sprintf("websocket.send: client side closed socket userId=%v", c.UserId))
				} else {
125
					l4g.Debug(fmt.Sprintf("websocket.send: closing websocket for userId=%v, error=%v", c.UserId, err.Error()))
126 127
				}

128
				return
=Corey Hulen's avatar
=Corey Hulen committed
129 130
			}

131 132 133 134 135 136
			if msg.EventType() == model.WEBSOCKET_EVENT_POSTED {
				if einterfaces.GetMetricsInterface() != nil {
					einterfaces.GetMetricsInterface().IncrementPostBroadcast()
				}
			}

=Corey Hulen's avatar
=Corey Hulen committed
137 138 139
		case <-ticker.C:
			c.WebSocket.SetWriteDeadline(time.Now().Add(WRITE_WAIT))
			if err := c.WebSocket.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
140 141 142 143
				// browsers will appear as CloseNoStatusReceived
				if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
					l4g.Debug(fmt.Sprintf("websocket.ticker: client side closed socket userId=%v", c.UserId))
				} else {
144
					l4g.Debug(fmt.Sprintf("websocket.ticker: closing websocket for userId=%v error=%v", c.UserId, err.Error()))
145 146
				}

=Corey Hulen's avatar
=Corey Hulen committed
147 148
				return
			}
149 150 151 152 153 154 155

		case <-authTicker.C:
			if c.SessionToken == "" {
				l4g.Debug(fmt.Sprintf("websocket.authTicker: did not authenticate ip=%v", c.WebSocket.RemoteAddr()))
				return
			}
			authTicker.Stop()
=Corey Hulen's avatar
=Corey Hulen committed
156 157 158 159
		}
	}
}

160 161 162
func (webCon *WebConn) InvalidateCache() {
	webCon.AllChannelMembers = nil
	webCon.LastAllChannelMembersTime = 0
163
	webCon.SessionExpiresAt = 0
164
	webCon.Session = nil
165
}
166

167
func (webCon *WebConn) IsAuthenticated() bool {
168 169 170 171 172 173
	// Check the expiry to see if we need to check for a new session
	if webCon.SessionExpiresAt < model.GetMillis() {
		if webCon.SessionToken == "" {
			return false
		}

174 175 176
		session, err := GetSession(webCon.SessionToken)
		if err != nil {
			l4g.Error(utils.T("api.websocket.invalid_session.error"), err.Error())
177 178
			webCon.SessionToken = ""
			webCon.SessionExpiresAt = 0
179
			webCon.Session = nil
180 181
			return false
		}
182

183 184
		webCon.SessionToken = session.Token
		webCon.SessionExpiresAt = session.ExpiresAt
185
		webCon.Session = session
186 187 188
	}

	return true
189 190
}

191 192
func (webCon *WebConn) SendHello() {
	msg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_HELLO, "", "", webCon.UserId, nil)
193
	msg.Add("server_version", fmt.Sprintf("%v.%v.%v.%v", model.CurrentVersion, model.BuildNumber, utils.CfgHash, utils.IsLicensed))
194 195 196
	webCon.Send <- msg
}

197
func (webCon *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool {
198
	// IMPORTANT: Do not send event if WebConn does not have a session
199
	if !webCon.IsAuthenticated() {
200 201 202
		return false
	}

203 204 205 206 207 208 209 210 211 212 213
	// If the event is destined to a specific user
	if len(msg.Broadcast.UserId) > 0 && webCon.UserId != msg.Broadcast.UserId {
		return false
	}

	// if the user is omitted don't send the message
	if len(msg.Broadcast.OmitUsers) > 0 {
		if _, ok := msg.Broadcast.OmitUsers[webCon.UserId]; ok {
			return false
		}
	}
214

215 216
	// Only report events to users who are in the channel for the event
	if len(msg.Broadcast.ChannelId) > 0 {
217
		if model.GetMillis()-webCon.LastAllChannelMembersTime > WEBCONN_MEMBER_CACHE_TIME {
218 219 220 221 222 223 224 225
			webCon.AllChannelMembers = nil
			webCon.LastAllChannelMembersTime = 0
		}

		if webCon.AllChannelMembers == nil {
			if result := <-Srv.Store.Channel().GetAllChannelMembersForUser(webCon.UserId, true); result.Err != nil {
				l4g.Error("webhub.shouldSendEvent: " + result.Err.Error())
				return false
226
			} else {
227 228
				webCon.AllChannelMembers = result.Data.(map[string]string)
				webCon.LastAllChannelMembersTime = model.GetMillis()
229
			}
230
		}
231

232 233 234 235
		if _, ok := webCon.AllChannelMembers[msg.Broadcast.ChannelId]; ok {
			return true
		} else {
			return false
236 237 238
		}
	}

239 240 241 242 243 244 245
	// Only report events to users who are in the team for the event
	if len(msg.Broadcast.TeamId) > 0 {
		return webCon.IsMemberOfTeam(msg.Broadcast.TeamId)

	}

	return true
246 247
}

248 249
func (webCon *WebConn) IsMemberOfTeam(teamId string) bool {

250 251 252 253
	if webCon.Session == nil {
		session, err := GetSession(webCon.SessionToken)
		if err != nil {
			l4g.Error(utils.T("api.websocket.invalid_session.error"), err.Error())
254
			return false
255 256
		} else {
			webCon.Session = session
257
		}
258 259 260 261 262 263 264 265 266

	}

	member := webCon.Session.GetTeamByTeamId(teamId)

	if member != nil {
		return true
	} else {
		return false
=Corey Hulen's avatar
=Corey Hulen committed
267 268
	}
}