jobs_watcher.go 2.59 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.

package jobs

import (
	"math/rand"
	"time"

	l4g "github.com/alecthomas/log4go"
Christopher Speller's avatar
Christopher Speller committed
11
	"github.com/mattermost/mattermost-server/model"
12 13 14
)

const (
15
	DEFAULT_WATCHER_POLLING_INTERVAL = 15000
16 17 18
)

type Watcher struct {
19
	srv     *JobServer
20 21
	workers *Workers

22 23 24
	stop            chan bool
	stopped         chan bool
	pollingInterval int
25 26
}

27
func (srv *JobServer) MakeWatcher(workers *Workers, pollingInterval int) *Watcher {
28
	return &Watcher{
29 30 31 32
		stop:            make(chan bool, 1),
		stopped:         make(chan bool, 1),
		pollingInterval: pollingInterval,
		workers:         workers,
33
		srv:             srv,
34 35 36 37 38 39 40 41 42
	}
}

func (watcher *Watcher) Start() {
	l4g.Debug("Watcher Started")

	// Delay for some random number of milliseconds before starting to ensure that multiple
	// instances of the jobserver  don't poll at a time too close to each other.
	rand.Seed(time.Now().UTC().UnixNano())
Chris's avatar
Chris committed
43
	<-time.After(time.Duration(rand.Intn(watcher.pollingInterval)) * time.Millisecond)
44

Chris's avatar
Chris committed
45
	defer func() {
46 47 48 49 50 51 52 53 54
		l4g.Debug("Watcher Finished")
		watcher.stopped <- true
	}()

	for {
		select {
		case <-watcher.stop:
			l4g.Debug("Watcher: Received stop signal")
			return
55
		case <-time.After(time.Duration(watcher.pollingInterval) * time.Millisecond):
56 57 58 59 60 61 62 63 64 65 66 67
			watcher.PollAndNotify()
		}
	}
}

func (watcher *Watcher) Stop() {
	l4g.Debug("Watcher Stopping")
	watcher.stop <- true
	<-watcher.stopped
}

func (watcher *Watcher) PollAndNotify() {
68
	if result := <-watcher.srv.Store.Job().GetAllByStatus(model.JOB_STATUS_PENDING); result.Err != nil {
69 70
		l4g.Error("Error occured getting all pending statuses: %v", result.Err.Error())
	} else {
71
		jobs := result.Data.([]*model.Job)
72

73 74
		for _, job := range jobs {
			if job.Type == model.JOB_TYPE_DATA_RETENTION {
75 76
				if watcher.workers.DataRetention != nil {
					select {
77
					case watcher.workers.DataRetention.JobChannel() <- *job:
78 79 80
					default:
					}
				}
81
			} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_INDEXING {
82 83
				if watcher.workers.ElasticsearchIndexing != nil {
					select {
84 85 86 87 88 89 90 91
					case watcher.workers.ElasticsearchIndexing.JobChannel() <- *job:
					default:
					}
				}
			} else if job.Type == model.JOB_TYPE_ELASTICSEARCH_POST_AGGREGATION {
				if watcher.workers.ElasticsearchAggregation != nil {
					select {
					case watcher.workers.ElasticsearchAggregation.JobChannel() <- *job:
92 93 94
					default:
					}
				}
95 96 97 98 99 100 101
			} else if job.Type == model.JOB_TYPE_LDAP_SYNC {
				if watcher.workers.LdapSync != nil {
					select {
					case watcher.workers.LdapSync.JobChannel() <- *job:
					default:
					}
				}
102 103 104 105
			}
		}
	}
}