Unverified Commit 0fa35c23 authored by Agniva De Sarker's avatar Agniva De Sarker Committed by GitHub
Browse files

MM-27116: Drain push notification channel before closing (#15066) (#15107)

Automatic Merge
parent 10b427a9
......@@ -25,13 +25,16 @@ const (
notificationTypeClear notificationType = "clear"
notificationTypeMessage notificationType = "message"
notificationTypeUpdateBadge notificationType = "update_badge"
notificationTypeDummy notificationType = "dummy"
)
type PushNotificationsHub struct {
notificationsChan chan PushNotification
app *App // XXX: This will go away once push notifications move to their own package.
sema chan struct{}
stopChan chan struct{}
wg *sync.WaitGroup
buffer int
}
type PushNotification struct {
......@@ -256,54 +259,70 @@ func (s *Server) createPushNotificationsHub() {
app: fakeApp,
wg: new(sync.WaitGroup),
sema: make(chan struct{}, runtime.NumCPU()*8), // numCPU * 8 is a good amount of concurrency.
stopChan: make(chan struct{}),
buffer: buffer,
}
go hub.start()
s.PushNotificationsHub = hub
}
func (hub *PushNotificationsHub) start() {
for notification := range hub.notificationsChan {
// Adding to the waitgroup first.
hub.wg.Add(1)
// Get token.
hub.sema <- struct{}{}
go func(notification PushNotification) {
defer func() {
// Release token.
<-hub.sema
// Now marking waitgroup as done.
hub.wg.Done()
}()
var err *model.AppError
switch notification.notificationType {
case notificationTypeClear:
err = hub.app.clearPushNotificationSync(notification.currentSessionId, notification.userId, notification.channelId)
case notificationTypeMessage:
err = hub.app.sendPushNotificationSync(
notification.post,
notification.user,
notification.channel,
notification.channelName,
notification.senderName,
notification.explicitMention,
notification.channelWideMention,
notification.replyToThreadType,
)
case notificationTypeUpdateBadge:
err = hub.app.updateMobileAppBadgeSync(notification.userId)
default:
mlog.Error("Invalid notification type", mlog.String("notification_type", string(notification.notificationType)))
}
if err != nil {
mlog.Error("Unable to send push notification", mlog.String("notification_type", string(notification.notificationType)), mlog.Err(err))
}
}(notification)
for {
select {
case notification := <-hub.notificationsChan:
// Adding to the waitgroup first.
hub.wg.Add(1)
// Get token.
hub.sema <- struct{}{}
go func(notification PushNotification) {
defer func() {
// Release token.
<-hub.sema
// Now marking waitgroup as done.
hub.wg.Done()
}()
var err *model.AppError
switch notification.notificationType {
case notificationTypeClear:
err = hub.app.clearPushNotificationSync(notification.currentSessionId, notification.userId, notification.channelId)
case notificationTypeMessage:
err = hub.app.sendPushNotificationSync(
notification.post,
notification.user,
notification.channel,
notification.channelName,
notification.senderName,
notification.explicitMention,
notification.channelWideMention,
notification.replyToThreadType,
)
case notificationTypeUpdateBadge:
err = hub.app.updateMobileAppBadgeSync(notification.userId)
case notificationTypeDummy:
return
default:
mlog.Error("Invalid notification type", mlog.String("notification_type", string(notification.notificationType)))
}
if err != nil {
mlog.Error("Unable to send push notification", mlog.String("notification_type", string(notification.notificationType)), mlog.Err(err))
}
}(notification)
case <-hub.stopChan:
return
}
}
}
func (hub *PushNotificationsHub) stop() {
// Drain the channel.
for i := 0; i < hub.buffer+1; i++ {
hub.notificationsChan <- PushNotification{
notificationType: notificationTypeDummy,
}
}
hub.stopChan <- struct{}{}
close(hub.notificationsChan)
hub.wg.Wait()
}
......
......@@ -645,7 +645,6 @@ func (s *Server) Shutdown() error {
defer sentry.Flush(2 * time.Second)
s.HubStop()
s.StopPushNotificationsHubWorkers()
s.ShutDownPlugins()
s.RemoveLicenseListener(s.licenseListenerId)
s.RemoveClusterLeaderChangedListener(s.clusterLeaderListenerId)
......@@ -663,6 +662,8 @@ func (s *Server) Shutdown() error {
s.StopHTTPServer()
s.stopLocalModeServer()
// Push notification hub needs to be shutdown after HTTP server
s.StopPushNotificationsHubWorkers()
s.WaitForGoroutines()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment