Commit 2114e96f authored by Joram Wilander's avatar Joram Wilander Committed by Christopher Speller

Reset status to away after web conn disconnect if necessary (#8988)

parent 437f9f5b
......@@ -227,6 +227,9 @@ func (a *App) AttachDeviceId(sessionId string, deviceId string, expiresAt int64)
func (a *App) UpdateLastActivityAtIfNeeded(session model.Session) {
now := model.GetMillis()
a.UpdateWebConnUserActivity(session, now)
if now-session.LastActivityAt < model.SESSION_ACTIVITY_TIMEOUT {
return
}
......
......@@ -161,6 +161,22 @@ func (a *App) GetUserStatusesByIds(userIds []string) ([]*model.Status, *model.Ap
return statusMap, nil
}
// SetStatusLastActivityAt sets the last activity at for a user on the local app server and updates
// status to away if needed. Used by the WS to set status to away if an 'online' device disconnects
// while an 'away' device is still connected
func (a *App) SetStatusLastActivityAt(userId string, activityAt int64) {
var status *model.Status
var err *model.AppError
if status, err = a.GetStatus(userId); err != nil {
return
}
status.LastActivityAt = activityAt
a.AddStatusCacheSkipClusterSend(status)
a.SetStatusAwayIfNeeded(userId, false)
}
func (a *App) SetStatusOnline(userId string, sessionId string, manual bool) {
if !*a.Config().ServiceSettings.EnableUserStatuses {
return
......
......@@ -33,6 +33,7 @@ type WebConn struct {
Send chan model.WebSocketMessage
sessionToken atomic.Value
session atomic.Value
LastUserActivityAt int64
UserId string
T goi18n.TranslateFunc
Locale string
......@@ -52,14 +53,15 @@ func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.Tra
}
wc := &WebConn{
App: a,
Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE),
WebSocket: ws,
UserId: session.UserId,
T: t,
Locale: locale,
endWritePump: make(chan struct{}, 2),
pumpFinished: make(chan struct{}, 1),
App: a,
Send: make(chan model.WebSocketMessage, SEND_QUEUE_SIZE),
WebSocket: ws,
LastUserActivityAt: model.GetMillis(),
UserId: session.UserId,
T: t,
Locale: locale,
endWritePump: make(chan struct{}, 2),
pumpFinished: make(chan struct{}, 1),
}
wc.SetSession(&session)
......
......@@ -23,6 +23,12 @@ const (
DEADLOCK_WARN = (BROADCAST_QUEUE_SIZE * 99) / 100 // number of buffered messages before printing stack trace
)
type WebConnActivityMessage struct {
UserId string
SessionToken string
ActivityAt int64
}
type Hub struct {
// connectionCount should be kept first.
// See https://github.com/mattermost/mattermost-server/pull/7281
......@@ -35,6 +41,7 @@ type Hub struct {
stop chan struct{}
didStop chan struct{}
invalidateUser chan string
activity chan *WebConnActivityMessage
ExplicitStop bool
goroutineId int
}
......@@ -48,6 +55,7 @@ func (a *App) NewWebHub() *Hub {
stop: make(chan struct{}),
didStop: make(chan struct{}),
invalidateUser: make(chan string),
activity: make(chan *WebConnActivityMessage),
ExplicitStop: false,
}
}
......@@ -330,6 +338,13 @@ func (a *App) InvalidateWebConnSessionCacheForUser(userId string) {
}
}
func (a *App) UpdateWebConnUserActivity(session model.Session, activityAt int64) {
hub := a.GetHubForUserId(session.UserId)
if hub != nil {
hub.UpdateActivity(session.UserId, session.Token, activityAt)
}
}
func (h *Hub) Register(webConn *WebConn) {
h.register <- webConn
......@@ -355,6 +370,10 @@ func (h *Hub) InvalidateUser(userId string) {
h.invalidateUser <- userId
}
func (h *Hub) UpdateActivity(userId, sessionToken string, activityAt int64) {
h.activity <- &WebConnActivityMessage{UserId: userId, SessionToken: sessionToken, ActivityAt: activityAt}
}
func getGoroutineId() int {
var buf [64]byte
n := runtime.Stack(buf[:], false)
......@@ -395,15 +414,34 @@ func (h *Hub) Start() {
continue
}
if len(connections.ForUser(webCon.UserId)) == 0 {
conns := connections.ForUser(webCon.UserId)
if len(conns) == 0 {
h.app.Go(func() {
h.app.SetStatusOffline(webCon.UserId, false)
})
} else {
var latestActivity int64 = 0
for _, conn := range conns {
if conn.LastUserActivityAt > latestActivity {
latestActivity = conn.LastUserActivityAt
}
}
if h.app.IsUserAway(latestActivity) {
h.app.Go(func() {
h.app.SetStatusLastActivityAt(webCon.UserId, latestActivity)
})
}
}
case userId := <-h.invalidateUser:
for _, webCon := range connections.ForUser(userId) {
webCon.InvalidateCache()
}
case activity := <-h.activity:
for _, webCon := range connections.ForUser(activity.UserId) {
if webCon.GetSessionToken() == activity.SessionToken {
webCon.LastUserActivityAt = activity.ActivityAt
}
}
case msg := <-h.broadcast:
candidates := connections.All()
if msg.Broadcast.UserId != "" {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment