web_hub.go 8.06 KB
Newer Older
1
// Copyright (c) 2015 Mattermost, Inc. All Rights Reserved.
=Corey Hulen's avatar
=Corey Hulen committed
2 3
// See License.txt for license information.

4
package app
=Corey Hulen's avatar
=Corey Hulen committed
5 6

import (
7
	"fmt"
8 9
	"hash/fnv"
	"runtime"
10
	"runtime/debug"
11

=Corey Hulen's avatar
=Corey Hulen committed
12
	l4g "github.com/alecthomas/log4go"
13 14

	"github.com/mattermost/platform/einterfaces"
15
	"github.com/mattermost/platform/model"
16
	"github.com/mattermost/platform/utils"
=Corey Hulen's avatar
=Corey Hulen committed
17 18 19
)

type Hub struct {
20
	connections    []*WebConn
21 22 23 24 25
	register       chan *WebConn
	unregister     chan *WebConn
	broadcast      chan *model.WebSocketEvent
	stop           chan string
	invalidateUser chan string
26
	ExplicitStop   bool
=Corey Hulen's avatar
=Corey Hulen committed
27 28
}

29 30 31 32 33 34
var hubs []*Hub = make([]*Hub, 0)

func NewWebHub() *Hub {
	return &Hub{
		register:       make(chan *WebConn),
		unregister:     make(chan *WebConn),
35
		connections:    make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
36 37 38
		broadcast:      make(chan *model.WebSocketEvent, 4096),
		stop:           make(chan string),
		invalidateUser: make(chan string),
39
		ExplicitStop:   false,
40 41 42 43
	}
}

func TotalWebsocketConnections() int {
44 45
	// This is racy, but it's only used for reporting information
	// so it's probably OK
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
	count := 0
	for _, hub := range hubs {
		count = count + len(hub.connections)
	}

	return count
}

func HubStart() {
	l4g.Info(utils.T("api.web_hub.start.starting.debug"), runtime.NumCPU()*2)

	// Total number of hubs is twice the number of CPUs.
	hubs = make([]*Hub, runtime.NumCPU()*2)

	for i := 0; i < len(hubs); i++ {
		hubs[i] = NewWebHub()
		hubs[i].Start()
	}
}

func HubStop() {
	l4g.Info(utils.T("api.web_hub.start.stopping.debug"))

	for _, hub := range hubs {
		hub.Stop()
	}

	hubs = make([]*Hub, 0)
}

76
func GetHubForUserId(userId string) *Hub {
77
	hash := fnv.New32a()
78
	hash.Write([]byte(userId))
79
	index := hash.Sum32() % uint32(len(hubs))
80 81 82 83 84
	return hubs[index]
}

func HubRegister(webConn *WebConn) {
	GetHubForUserId(webConn.UserId).Register(webConn)
85 86 87
}

func HubUnregister(webConn *WebConn) {
88
	GetHubForUserId(webConn.UserId).Unregister(webConn)
=Corey Hulen's avatar
=Corey Hulen committed
89 90
}

91
func Publish(message *model.WebSocketEvent) {
92

93 94 95 96
	if metrics := einterfaces.GetMetricsInterface(); metrics != nil {
		metrics.IncrementWebsocketEvent(message.Event)
	}

97 98 99
	for _, hub := range hubs {
		hub.Broadcast(message)
	}
100 101 102 103 104 105 106

	if einterfaces.GetClusterInterface() != nil {
		einterfaces.GetClusterInterface().Publish(message)
	}
}

func PublishSkipClusterSend(message *model.WebSocketEvent) {
107 108 109
	for _, hub := range hubs {
		hub.Broadcast(message)
	}
110 111
}

112 113 114
func InvalidateCacheForChannel(channel *model.Channel) {
	InvalidateCacheForChannelSkipClusterSend(channel.Id)
	InvalidateCacheForChannelByNameSkipClusterSend(channel.TeamId, channel.Name)
115 116

	if cluster := einterfaces.GetClusterInterface(); cluster != nil {
117 118 119 120 121 122 123 124 125 126
		cluster.InvalidateCacheForChannel(channel.Id)
		cluster.InvalidateCacheForChannelByName(channel.TeamId, channel.Name)
	}
}

func InvalidateCacheForChannelMembers(channelId string) {
	InvalidateCacheForChannelMembersSkipClusterSend(channelId)

	if cluster := einterfaces.GetClusterInterface(); cluster != nil {
		cluster.InvalidateCacheForChannelMembers(channelId)
127 128 129 130
	}
}

func InvalidateCacheForChannelSkipClusterSend(channelId string) {
131 132 133 134
	Srv.Store.Channel().InvalidateChannel(channelId)
}

func InvalidateCacheForChannelMembersSkipClusterSend(channelId string) {
135 136
	Srv.Store.User().InvalidateProfilesInChannelCache(channelId)
	Srv.Store.Channel().InvalidateMemberCount(channelId)
137 138
}

139 140 141 142 143 144 145 146 147 148 149 150
func InvalidateCacheForChannelMembersNotifyProps(channelId string) {
	InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId)

	if cluster := einterfaces.GetClusterInterface(); cluster != nil {
		cluster.InvalidateCacheForChannelMembersNotifyProps(channelId)
	}
}

func InvalidateCacheForChannelMembersNotifyPropsSkipClusterSend(channelId string) {
	Srv.Store.Channel().InvalidateCacheForChannelMembersNotifyProps(channelId)
}

151 152
func InvalidateCacheForChannelByNameSkipClusterSend(teamId, name string) {
	Srv.Store.Channel().InvalidateChannelByName(teamId, name)
153 154
}

155 156 157 158 159 160 161 162 163
func InvalidateCacheForChannelPosts(channelId string) {
	InvalidateCacheForChannelPostsSkipClusterSend(channelId)

	if cluster := einterfaces.GetClusterInterface(); cluster != nil {
		cluster.InvalidateCacheForChannelPosts(channelId)
	}
}

func InvalidateCacheForChannelPostsSkipClusterSend(channelId string) {
164
	Srv.Store.Post().InvalidateLastPostTimeCache(channelId)
165 166
}

167
func InvalidateCacheForUser(userId string) {
168
	InvalidateCacheForUserSkipClusterSend(userId)
169 170 171 172

	if einterfaces.GetClusterInterface() != nil {
		einterfaces.GetClusterInterface().InvalidateCacheForUser(userId)
	}
173 174
}

175 176
func InvalidateCacheForUserSkipClusterSend(userId string) {
	Srv.Store.Channel().InvalidateAllChannelMembersForUser(userId)
177
	Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId)
178
	Srv.Store.User().InvalidatProfileCacheForUser(userId)
179

180 181 182
	if len(hubs) != 0 {
		GetHubForUserId(userId).InvalidateUser(userId)
	}
183 184
}

185 186 187 188 189 190 191 192 193 194 195 196
func InvalidateCacheForWebhook(webhookId string) {
	InvalidateCacheForWebhookSkipClusterSend(webhookId)

	if cluster := einterfaces.GetClusterInterface(); cluster != nil {
		cluster.InvalidateCacheForWebhook(webhookId)
	}
}

func InvalidateCacheForWebhookSkipClusterSend(webhookId string) {
	Srv.Store.Webhook().InvalidateWebhookCache(webhookId)
}

197 198 199 200 201 202
func InvalidateWebConnSessionCacheForUser(userId string) {
	if len(hubs) != 0 {
		GetHubForUserId(userId).InvalidateUser(userId)
	}
}

203 204 205 206 207 208 209 210 211 212 213 214
func InvalidateCacheForReactions(postId string) {
	InvalidateCacheForReactionsSkipClusterSend(postId)

	if cluster := einterfaces.GetClusterInterface(); cluster != nil {
		cluster.InvalidateCacheForReactions(postId)
	}
}

func InvalidateCacheForReactionsSkipClusterSend(postId string) {
	Srv.Store.Reaction().InvalidateCacheForPost(postId)
}

=Corey Hulen's avatar
=Corey Hulen committed
215 216
func (h *Hub) Register(webConn *WebConn) {
	h.register <- webConn
217

218
	if webConn.IsAuthenticated() {
219 220
		webConn.SendHello()
	}
=Corey Hulen's avatar
=Corey Hulen committed
221 222 223 224 225 226
}

func (h *Hub) Unregister(webConn *WebConn) {
	h.unregister <- webConn
}

227
func (h *Hub) Broadcast(message *model.WebSocketEvent) {
228 229 230 231 232
	if message != nil {
		h.broadcast <- message
	}
}

233 234 235 236
func (h *Hub) InvalidateUser(userId string) {
	h.invalidateUser <- userId
}

237 238
func (h *Hub) Stop() {
	h.stop <- "all"
=Corey Hulen's avatar
=Corey Hulen committed
239 240 241
}

func (h *Hub) Start() {
242 243 244 245 246
	var doStart func()
	var doRecoverableStart func()
	var doRecover func()

	doStart = func() {
=Corey Hulen's avatar
=Corey Hulen committed
247 248
		for {
			select {
249
			case webCon := <-h.register:
250
				h.connections = append(h.connections, webCon)
=Corey Hulen's avatar
=Corey Hulen committed
251

252
			case webCon := <-h.unregister:
253
				userId := webCon.UserId
254

255
				found := false
256 257 258 259 260 261 262
				indexToDel := -1
				for i, webConCandidate := range h.connections {
					if webConCandidate == webCon {
						indexToDel = i
						continue
					}
					if userId == webConCandidate.UserId {
263
						found = true
264 265 266
						if indexToDel != -1 {
							break
						}
267 268 269
					}
				}

270 271 272 273 274 275 276 277 278 279
				if indexToDel != -1 {
					// Delete the webcon we are unregistering
					h.connections[indexToDel] = h.connections[len(h.connections)-1]
					h.connections = h.connections[:len(h.connections)-1]
				}

				if len(userId) == 0 {
					continue
				}

280
				if !found {
281
					go SetStatusOffline(userId, false)
282
				}
283

284
			case userId := <-h.invalidateUser:
285
				for _, webCon := range h.connections {
286 287 288
					if webCon.UserId == userId {
						webCon.InvalidateCache()
					}
=Corey Hulen's avatar
=Corey Hulen committed
289
				}
290

291
			case msg := <-h.broadcast:
292 293 294 295 296 297 298 299 300 301 302 303
				for _, webCon := range h.connections {
					if webCon.ShouldSendEvent(msg) {
						select {
						case webCon.Send <- msg:
						default:
							l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId))
							close(webCon.Send)
							for i, webConCandidate := range h.connections {
								if webConCandidate == webCon {
									h.connections[i] = h.connections[len(h.connections)-1]
									h.connections = h.connections[:len(h.connections)-1]
									break
304 305
								}
							}
306 307
						}
					}
308
				}
309

310
			case <-h.stop:
311
				for _, webCon := range h.connections {
312
					webCon.WebSocket.Close()
=Corey Hulen's avatar
=Corey Hulen committed
313
				}
314
				h.ExplicitStop = true
315

316
				return
=Corey Hulen's avatar
=Corey Hulen committed
317 318
			}
		}
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
	}

	doRecoverableStart = func() {
		defer doRecover()
		doStart()
	}

	doRecover = func() {
		if !h.ExplicitStop {
			if r := recover(); r != nil {
				l4g.Error(fmt.Sprintf("Recovering from Hub panic. Panic was: %v", r))
			} else {
				l4g.Error("Webhub stopped unexpectedly. Recovering.")
			}

			l4g.Error(string(debug.Stack()))

			go doRecoverableStart()
		}
	}

	go doRecoverableStart()
=Corey Hulen's avatar
=Corey Hulen committed
341
}