Commit 19a5d004 authored by Chris's avatar Chris Committed by Christopher Speller

MM-8710: Web Hub optimizations (#8293)

* webhub optimizations

* test fix

* minor fix

* big perf improvement to ToJson after precomputing

* fix hub connection count
parent babd795d
......@@ -30,7 +30,6 @@ type Hub struct {
// See https://github.com/mattermost/mattermost-server/pull/7281
connectionCount int64
app *App
connections []*WebConn
connectionIndex int
register chan *WebConn
unregister chan *WebConn
......@@ -47,7 +46,6 @@ func (a *App) NewWebHub() *Hub {
app: a,
register: make(chan *WebConn, 1),
unregister: make(chan *WebConn, 1),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
broadcast: make(chan *model.WebSocketEvent, BROADCAST_QUEUE_SIZE),
stop: make(chan struct{}),
didStop: make(chan struct{}),
......@@ -170,8 +168,14 @@ func (a *App) Publish(message *model.WebSocketEvent) {
}
func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
for _, hub := range a.Hubs {
hub.Broadcast(message)
if message.Broadcast.UserId != "" {
if len(a.Hubs) != 0 {
a.GetHubForUserId(message.Broadcast.UserId).Broadcast(message)
}
} else {
for _, hub := range a.Hubs {
hub.Broadcast(message)
}
}
}
......@@ -362,80 +366,53 @@ func (h *Hub) Start() {
var doRecover func()
doStart = func() {
h.goroutineId = getGoroutineId()
l4g.Debug("Hub for index %v is starting with goroutine %v", h.connectionIndex, h.goroutineId)
connections := newHubConnectionIndex()
for {
select {
case webCon := <-h.register:
h.connections = append(h.connections, webCon)
atomic.StoreInt64(&h.connectionCount, int64(len(h.connections)))
connections.Add(webCon)
atomic.StoreInt64(&h.connectionCount, int64(len(connections.All())))
case webCon := <-h.unregister:
userId := webCon.UserId
found := false
indexToDel := -1
for i, webConCandidate := range h.connections {
if webConCandidate == webCon {
indexToDel = i
continue
}
if userId == webConCandidate.UserId {
found = true
if indexToDel != -1 {
break
}
}
}
connections.Remove(webCon)
if indexToDel != -1 {
// Delete the webcon we are unregistering
h.connections[indexToDel] = h.connections[len(h.connections)-1]
h.connections = h.connections[:len(h.connections)-1]
}
if len(userId) == 0 {
if len(webCon.UserId) == 0 {
continue
}
if !found {
if len(connections.ForUser(webCon.UserId)) == 0 {
h.app.Go(func() {
h.app.SetStatusOffline(userId, false)
h.app.SetStatusOffline(webCon.UserId, false)
})
}
case userId := <-h.invalidateUser:
for _, webCon := range h.connections {
if webCon.UserId == userId {
webCon.InvalidateCache()
}
for _, webCon := range connections.ForUser(userId) {
webCon.InvalidateCache()
}
case msg := <-h.broadcast:
for _, webCon := range h.connections {
candidates := connections.All()
if msg.Broadcast.UserId != "" {
candidates = connections.ForUser(msg.Broadcast.UserId)
}
msg.PrecomputeJSON()
for _, webCon := range candidates {
if webCon.ShouldSendEvent(msg) {
select {
case webCon.Send <- msg:
default:
l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId))
close(webCon.Send)
for i, webConCandidate := range h.connections {
if webConCandidate == webCon {
h.connections[i] = h.connections[len(h.connections)-1]
h.connections = h.connections[:len(h.connections)-1]
break
}
}
connections.Remove(webCon)
}
}
}
case <-h.stop:
userIds := make(map[string]bool)
for _, webCon := range h.connections {
for _, webCon := range connections.All() {
userIds[webCon.UserId] = true
webCon.Close()
}
......@@ -444,7 +421,6 @@ func (h *Hub) Start() {
h.app.SetStatusOffline(userId, false)
}
h.connections = make([]*WebConn, 0, model.SESSION_CACHE_SIZE)
h.ExplicitStop = true
close(h.didStop)
......@@ -474,3 +450,60 @@ func (h *Hub) Start() {
go doRecoverableStart()
}
type hubConnectionIndexIndexes struct {
connections int
connectionsByUserId int
}
// hubConnectionIndex provides fast addition, removal, and iteration of web connections.
type hubConnectionIndex struct {
connections []*WebConn
connectionsByUserId map[string][]*WebConn
connectionIndexes map[*WebConn]*hubConnectionIndexIndexes
}
func newHubConnectionIndex() *hubConnectionIndex {
return &hubConnectionIndex{
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
connectionsByUserId: make(map[string][]*WebConn),
connectionIndexes: make(map[*WebConn]*hubConnectionIndexIndexes),
}
}
func (i *hubConnectionIndex) Add(wc *WebConn) {
i.connections = append(i.connections, wc)
i.connectionsByUserId[wc.UserId] = append(i.connectionsByUserId[wc.UserId], wc)
i.connectionIndexes[wc] = &hubConnectionIndexIndexes{
connections: len(i.connections) - 1,
connectionsByUserId: len(i.connectionsByUserId[wc.UserId]) - 1,
}
}
func (i *hubConnectionIndex) Remove(wc *WebConn) {
indexes, ok := i.connectionIndexes[wc]
if !ok {
return
}
last := i.connections[len(i.connections)-1]
i.connections[indexes.connections] = last
i.connections = i.connections[:len(i.connections)-1]
i.connectionIndexes[last].connections = indexes.connections
userConnections := i.connectionsByUserId[wc.UserId]
last = userConnections[len(userConnections)-1]
userConnections[indexes.connectionsByUserId] = last
i.connectionsByUserId[wc.UserId] = userConnections[:len(userConnections)-1]
i.connectionIndexes[last].connectionsByUserId = indexes.connectionsByUserId
delete(i.connectionIndexes, wc)
}
func (i *hubConnectionIndex) ForUser(id string) []*WebConn {
return i.connectionsByUserId[id]
}
func (i *hubConnectionIndex) All() []*WebConn {
return i.connections
}
......@@ -5,6 +5,7 @@ package model
import (
"encoding/json"
"fmt"
"io"
)
......@@ -58,11 +59,32 @@ type WebsocketBroadcast struct {
TeamId string `json:"team_id"` // broadcast only occurs for users in this team
}
type precomputedWebSocketEventJSON struct {
Event json.RawMessage
Data json.RawMessage
Broadcast json.RawMessage
}
type WebSocketEvent struct {
Event string `json:"event"`
Data map[string]interface{} `json:"data"`
Broadcast *WebsocketBroadcast `json:"broadcast"`
Sequence int64 `json:"seq"`
precomputedJSON *precomputedWebSocketEventJSON
}
// PrecomputeJSON precomputes and stores the serialized JSON for all fields other than Sequence.
// This makes ToJson much more efficient when sending the same event to multiple connections.
func (m *WebSocketEvent) PrecomputeJSON() {
event, _ := json.Marshal(m.Event)
data, _ := json.Marshal(m.Data)
broadcast, _ := json.Marshal(m.Broadcast)
m.precomputedJSON = &precomputedWebSocketEventJSON{
Event: json.RawMessage(event),
Data: json.RawMessage(data),
Broadcast: json.RawMessage(broadcast),
}
}
func (m *WebSocketEvent) Add(key string, value interface{}) {
......@@ -83,6 +105,9 @@ func (o *WebSocketEvent) EventType() string {
}
func (o *WebSocketEvent) ToJson() string {
if o.precomputedJSON != nil {
return fmt.Sprintf(`{"event": %s, "data": %s, "broadcast": %s, "seq": %d}`, o.precomputedJSON.Event, o.precomputedJSON.Data, o.precomputedJSON.Broadcast, o.Sequence)
}
b, _ := json.Marshal(o)
return string(b)
}
......
......@@ -6,6 +6,8 @@ package model
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWebSocketEvent(t *testing.T) {
......@@ -54,3 +56,49 @@ func TestWebSocketResponse(t *testing.T) {
t.Fatal("Ids do not match")
}
}
func TestWebSocketEvent_PrecomputeJSON(t *testing.T) {
event := NewWebSocketEvent(WEBSOCKET_EVENT_POSTED, "foo", "bar", "baz", nil)
event.Sequence = 7
before := event.ToJson()
event.PrecomputeJSON()
after := event.ToJson()
assert.JSONEq(t, before, after)
}
var stringSink string
func BenchmarkWebSocketEvent_ToJson(b *testing.B) {
event := NewWebSocketEvent(WEBSOCKET_EVENT_POSTED, "foo", "bar", "baz", nil)
for i := 0; i < 100; i++ {
event.Data[NewId()] = NewId()
}
b.Run("SerializedNTimes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
stringSink = event.ToJson()
}
})
b.Run("PrecomputedNTimes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
event.PrecomputeJSON()
}
})
b.Run("PrecomputedAndSerializedNTimes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
event.PrecomputeJSON()
stringSink = event.ToJson()
}
})
event.PrecomputeJSON()
b.Run("PrecomputedOnceAndSerializedNTimes", func(b *testing.B) {
for i := 0; i < b.N; i++ {
stringSink = event.ToJson()
}
})
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment