Unverified Commit 51bd710e authored by George Goldberg's avatar George Goldberg Committed by GitHub

MM-9728: Online migration for advanced permissions phase 2 (#8744)

* MM-9728: Online migration for advanced permissions phase 2

* Add unit tests for new store functions.

* Move migration specific code to own file.

* Add migration state function test.

* Style fixes.

* Add i18n strings.

* Fix mocks.

* Add TestMain to migrations package tests.

* Fix typo.

* Fix review comments.

* Fix up the "Check if migration is done" check to actually work.
parent 91557bbd
......@@ -12,6 +12,8 @@ import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
......@@ -1890,6 +1892,17 @@ func TestUpdateChannelScheme(t *testing.T) {
th.App.SetLicense(model.NewTestLicense(""))
// Mark the migration as done.
<-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2)
res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"})
assert.Nil(t, res.Err)
// Un-mark the migration at the end of the test.
defer func() {
res := <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2)
assert.Nil(t, res.Err)
}()
team := &model.Team{
DisplayName: "Name",
Description: "Some description",
......
This diff is collapsed.
......@@ -2059,6 +2059,17 @@ func TestUpdateTeamScheme(t *testing.T) {
th.App.SetLicense(model.NewTestLicense(""))
// Mark the migration as done.
<-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2)
res := <-th.App.Srv.Store.System().Save(&model.System{Name: model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2, Value: "true"})
assert.Nil(t, res.Err)
// Un-mark the migration at the end of the test.
defer func() {
res := <-th.App.Srv.Store.System().PermanentDeleteByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2)
assert.Nil(t, res.Err)
}()
team := &model.Team{
DisplayName: "Name",
Description: "Some description",
......
......@@ -20,6 +20,7 @@ import (
"github.com/mattermost/mattermost-server/einterfaces"
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
"github.com/mattermost/mattermost-server/jobs"
tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/plugin/pluginenv"
......@@ -319,6 +320,12 @@ func RegisterJobsLdapSyncInterface(f func(*App) ejobs.LdapSyncInterface) {
jobsLdapSyncInterface = f
}
var jobsMigrationsInterface func(*App) tjobs.MigrationsJobInterface
func RegisterJobsMigrationsJobInterface(f func(*App) tjobs.MigrationsJobInterface) {
jobsMigrationsInterface = f
}
var ldapInterface func(*App) einterfaces.LdapInterface
func RegisterLdapInterface(f func(*App) einterfaces.LdapInterface) {
......@@ -413,6 +420,9 @@ func (a *App) initJobs() {
if jobsLdapSyncInterface != nil {
a.Jobs.LdapSync = jobsLdapSyncInterface(a)
}
if jobsMigrationsInterface != nil {
a.Jobs.Migrations = jobsMigrationsInterface(a)
}
}
func (a *App) DiagnosticId() string {
......
......@@ -3,7 +3,9 @@
package app
import "github.com/mattermost/mattermost-server/model"
import (
"github.com/mattermost/mattermost-server/model"
)
func (a *App) GetScheme(id string) (*model.Scheme, *model.AppError) {
if result := <-a.Srv.Store.Scheme().Get(id); result.Err != nil {
......@@ -109,7 +111,9 @@ func (a *App) GetChannelsForScheme(scheme *model.Scheme, offset int, limit int)
}
func (a *App) IsPhase2MigrationCompleted() *model.AppError {
// TODO: Actually check the Phase 2 migration has completed before permitting these actions.
if result := <-a.Srv.Store.System().GetByName(model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2); result.Err != nil {
return result.Err
}
return nil
}
......@@ -47,6 +47,54 @@
"id": "September",
"translation": "September"
},
{
"id": "migrations.worker.run_advanced_permissions_phase_2_migration.invalid_progress",
"translation": "Migration failed due to invalid progress data."
},
{
"id": "migrations.worker.run_migration.unknown_key",
"translation": "Cannot run migration job due to unknown migration key."
},
{
"id": "store.sql_channel.migrate_channel_members.open_transaction.app_error",
"translation": "Failed to open the database transaction"
},
{
"id": "store.sql_channel.migrate_channel_members.select.app_error",
"translation": "Failed to select the batch of channel members"
},
{
"id": "store.sql_channel.migrate_channel_members.rollback_transaction.app_error",
"translation": "Failed to roll back the database transaction"
},
{
"id": "store.sql_channel.migrate_channel_members.update.app_error",
"translation": "Failed to update the channel member"
},
{
"id": "store.sql_channel.migrate_channel_members.commit_transaction.app_error",
"translation": "Failed to commit the database transaction"
},
{
"id": "store.sql_team.migrate_team_members.open_transaction.app_error",
"translation": "Failed to open the database transaction"
},
{
"id": "store.sql_team.migrate_team_members.select.app_error",
"translation": " Failed to select the batch of team members"
},
{
"id": "store.sql_team.migrate_team_members.rollback_transaction.app_error",
"translation": "Failed to roll back the database transaction"
},
{
"id": "store.sql_team.migrate_team_members.update.app_error",
"translation": "Failed to update the team member"
},
{
"id": "store.sql_team.migrate_team_members.commit_transaction.app_error",
"translation": "Failed to commit the database transaction"
},
{
"id": "api.admin.add_certificate.no_file.app_error",
"translation": "No file under 'certificate' in request."
......
......@@ -4,3 +4,7 @@
package imports
// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty
import (
_ "github.com/mattermost/mattermost-server/migrations"
)
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package interfaces
import "github.com/mattermost/mattermost-server/model"
type MigrationsJobInterface interface {
MakeWorker() model.Worker
MakeScheduler() model.Scheduler
}
......@@ -106,6 +106,13 @@ func (srv *JobServer) SetJobCanceled(job *model.Job) *model.AppError {
return result.Err
}
func (srv *JobServer) UpdateInProgressJobData(job *model.Job) *model.AppError {
job.Status = model.JOB_STATUS_IN_PROGRESS
job.LastActivityAt = model.GetMillis()
result := <-srv.Store.Job().UpdateOptimistically(job, model.JOB_STATUS_IN_PROGRESS)
return result.Err
}
func (srv *JobServer) RequestCancellation(jobId string) *model.AppError {
if result := <-srv.Store.Job().UpdateStatusOptimistically(jobId, model.JOB_STATUS_PENDING, model.JOB_STATUS_CANCELED); result.Err != nil {
return result.Err
......
......@@ -107,6 +107,13 @@ func (watcher *Watcher) PollAndNotify() {
default:
}
}
} else if job.Type == model.JOB_TYPE_MIGRATIONS {
if watcher.workers.Migrations != nil {
select {
case watcher.workers.Migrations.JobChannel() <- *job:
default:
}
}
}
}
}
......
......@@ -50,6 +50,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers {
schedulers.schedulers = append(schedulers.schedulers, ldapSyncInterface.MakeScheduler())
}
if migrationsInterface := srv.Migrations; migrationsInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, migrationsInterface.MakeScheduler())
}
schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
return schedulers
}
......
......@@ -5,6 +5,7 @@ package jobs
import (
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
)
......@@ -34,6 +35,7 @@ type JobServer struct {
ElasticsearchAggregator ejobs.ElasticsearchAggregatorInterface
ElasticsearchIndexer ejobs.ElasticsearchIndexerInterface
LdapSync ejobs.LdapSyncInterface
Migrations tjobs.MigrationsJobInterface
}
func NewJobServer(configService ConfigService, store store.Store) *JobServer {
......
......@@ -20,6 +20,7 @@ type Workers struct {
ElasticsearchIndexing model.Worker
ElasticsearchAggregation model.Worker
LdapSync model.Worker
Migrations model.Worker
listenerId string
}
......@@ -50,6 +51,10 @@ func (srv *JobServer) InitWorkers() *Workers {
workers.LdapSync = ldapSyncInterface.MakeWorker()
}
if migrationsInterface := srv.Migrations; migrationsInterface != nil {
workers.Migrations = migrationsInterface.MakeWorker()
}
return workers
}
......@@ -77,6 +82,10 @@ func (workers *Workers) Start() *Workers {
go workers.LdapSync.Run()
}
if workers.Migrations != nil {
go workers.Migrations.Run()
}
go workers.Watcher.Start()
})
......@@ -152,6 +161,10 @@ func (workers *Workers) Stop() *Workers {
workers.LdapSync.Stop()
}
if workers.Migrations != nil {
workers.Migrations.Stop()
}
mlog.Info("Stopped workers")
return workers
......
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package migrations
import (
"encoding/json"
"io"
"net/http"
"strings"
"github.com/mattermost/mattermost-server/model"
)
type AdvancedPermissionsPhase2Progress struct {
CurrentTable string `json:"current_table"`
LastTeamId string `json:"last_team_id"`
LastChannelId string `json:"last_channel_id"`
LastUserId string `json:"last_user"`
}
func (p *AdvancedPermissionsPhase2Progress) ToJson() string {
b, _ := json.Marshal(p)
return string(b)
}
func AdvancedPermissionsPhase2ProgressFromJson(data io.Reader) *AdvancedPermissionsPhase2Progress {
var o *AdvancedPermissionsPhase2Progress
json.NewDecoder(data).Decode(&o)
return o
}
func (p *AdvancedPermissionsPhase2Progress) IsValid() bool {
if len(p.LastChannelId) != 26 {
return false
}
if len(p.LastTeamId) != 26 {
return false
}
if len(p.LastUserId) != 26 {
return false
}
switch p.CurrentTable {
case "TeamMembers":
case "ChannelMembers":
default:
return false
}
return true
}
func (worker *Worker) runAdvancedPermissionsPhase2Migration(lastDone string) (bool, string, *model.AppError) {
var progress *AdvancedPermissionsPhase2Progress
if len(lastDone) == 0 {
// Haven't started the migration yet.
progress = new(AdvancedPermissionsPhase2Progress)
progress.CurrentTable = "TeamMembers"
progress.LastChannelId = strings.Repeat("0", 26)
progress.LastTeamId = strings.Repeat("0", 26)
progress.LastUserId = strings.Repeat("0", 26)
} else {
progress = AdvancedPermissionsPhase2ProgressFromJson(strings.NewReader(lastDone))
if !progress.IsValid() {
return false, "", model.NewAppError("MigrationsWorker.runAdvancedPermissionsPhase2Migration", "migrations.worker.run_advanced_permissions_phase_2_migration.invalid_progress", map[string]interface{}{"progress": progress.ToJson()}, "", http.StatusInternalServerError)
}
}
if progress.CurrentTable == "TeamMembers" {
// Run a TeamMembers migration batch.
if result := <-worker.app.Srv.Store.Team().MigrateTeamMembers(progress.LastTeamId, progress.LastUserId); result.Err != nil {
return false, progress.ToJson(), result.Err
} else {
if result.Data == nil {
// We haven't progressed. That means that we've reached the end of this stage of the migration, and should now advance to the next stage.
progress.LastUserId = strings.Repeat("0", 26)
progress.CurrentTable = "ChannelMembers"
return false, progress.ToJson(), nil
}
data := result.Data.(map[string]string)
progress.LastTeamId = data["TeamId"]
progress.LastUserId = data["UserId"]
}
} else if progress.CurrentTable == "ChannelMembers" {
// Run a ChannelMembers migration batch.
if result := <-worker.app.Srv.Store.Channel().MigrateChannelMembers(progress.LastChannelId, progress.LastUserId); result.Err != nil {
return false, progress.ToJson(), result.Err
} else {
if result.Data == nil {
// We haven't progressed. That means we've reached the end of this final stage of the migration.
return true, progress.ToJson(), nil
}
data := result.Data.(map[string]string)
progress.LastChannelId = data["ChannelId"]
progress.LastUserId = data["UserId"]
}
}
return false, progress.ToJson(), nil
}
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package migrations
import (
"github.com/mattermost/mattermost-server/app"
tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
)
const (
MIGRATION_STATE_UNSCHEDULED = "unscheduled"
MIGRATION_STATE_IN_PROGRESS = "in_progress"
MIGRATION_STATE_COMPLETED = "completed"
JOB_DATA_KEY_MIGRATION = "migration_key"
JOB_DATA_KEY_MIGRATION_LAST_DONE = "last_done"
)
type MigrationsJobInterfaceImpl struct {
App *app.App
}
func init() {
app.RegisterJobsMigrationsJobInterface(func(a *app.App) tjobs.MigrationsJobInterface {
return &MigrationsJobInterfaceImpl{a}
})
}
func MakeMigrationsList() []string {
return []string{
model.MIGRATION_KEY_ADVANCED_PERMISSIONS_PHASE_2,
}
}
func GetMigrationState(migration string, store store.Store) (string, *model.Job, *model.AppError) {
if result := <-store.System().GetByName(migration); result.Err == nil {
return MIGRATION_STATE_COMPLETED, nil, nil
}
if result := <-store.Job().GetAllByType(model.JOB_TYPE_MIGRATIONS); result.Err != nil {
return "", nil, result.Err
} else {
for _, job := range result.Data.([]*model.Job) {
if key, ok := job.Data[JOB_DATA_KEY_MIGRATION]; ok {
if key != migration {
continue
}
switch job.Status {
case model.JOB_STATUS_IN_PROGRESS, model.JOB_STATUS_PENDING:
return MIGRATION_STATE_IN_PROGRESS, job, nil
default:
return MIGRATION_STATE_UNSCHEDULED, job, nil
}
}
}
}
return MIGRATION_STATE_UNSCHEDULED, nil, nil
}
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package migrations
import (
"flag"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store/storetest"
"github.com/mattermost/mattermost-server/utils"
)
func TestMain(m *testing.M) {
flag.Parse()
// Setup a global logger to catch tests logging outside of app context
// The global logger will be stomped by apps initalizing but that's fine for testing. Ideally this won't happen.
mlog.InitGlobalLogger(mlog.NewLogger(&mlog.LoggerConfiguration{
EnableConsole: true,
ConsoleJson: true,
ConsoleLevel: "error",
EnableFile: false,
}))
utils.TranslationsPreInit()
// In the case where a dev just wants to run a single test, it's faster to just use the default
// store.
if filter := flag.Lookup("test.run").Value.String(); filter != "" && filter != "." {
mlog.Info("-test.run used, not creating temporary containers")
os.Exit(m.Run())
}
status := 0
container, settings, err := storetest.NewMySQLContainer()
if err != nil {
panic(err)
}
UseTestStore(container, settings)
defer func() {
StopTestStore()
os.Exit(status)
}()
status = m.Run()
}
func TestGetMigrationState(t *testing.T) {
th := Setup()
defer th.TearDown()
migrationKey := model.NewId()
th.DeleteAllJobsByTypeAndMigrationKey(model.JOB_TYPE_MIGRATIONS, migrationKey)
// Test with no job yet.
state, job, err := GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
assert.Nil(t, job)
assert.Equal(t, "unscheduled", state)
// Test with the system table showing the migration as done.
system := model.System{
Name: migrationKey,
Value: "true",
}
res1 := <-th.App.Srv.Store.System().Save(&system)
assert.Nil(t, res1.Err)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
assert.Nil(t, job)
assert.Equal(t, "completed", state)
res2 := <-th.App.Srv.Store.System().PermanentDeleteByName(migrationKey)
assert.Nil(t, res2.Err)
// Test with a job scheduled in "pending" state.
j1 := &model.Job{
Id: model.NewId(),
CreateAt: model.GetMillis(),
Data: map[string]string{
JOB_DATA_KEY_MIGRATION: migrationKey,
},
Status: model.JOB_STATUS_PENDING,
Type: model.JOB_TYPE_MIGRATIONS,
}
j1 = (<-th.App.Srv.Store.Job().Save(j1)).Data.(*model.Job)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
assert.Equal(t, j1.Id, job.Id)
assert.Equal(t, "in_progress", state)
// Test with a job scheduled in "in progress" state.
j2 := &model.Job{
Id: model.NewId(),
CreateAt: j1.CreateAt + 1,
Data: map[string]string{
JOB_DATA_KEY_MIGRATION: migrationKey,
},
Status: model.JOB_STATUS_IN_PROGRESS,
Type: model.JOB_TYPE_MIGRATIONS,
}
j2 = (<-th.App.Srv.Store.Job().Save(j2)).Data.(*model.Job)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
assert.Equal(t, j2.Id, job.Id)
assert.Equal(t, "in_progress", state)
// Test with a job scheduled in "error" state.
j3 := &model.Job{
Id: model.NewId(),
CreateAt: j2.CreateAt + 1,
Data: map[string]string{
JOB_DATA_KEY_MIGRATION: migrationKey,
},
Status: model.JOB_STATUS_ERROR,
Type: model.JOB_TYPE_MIGRATIONS,
}
j3 = (<-th.App.Srv.Store.Job().Save(j3)).Data.(*model.Job)
state, job, err = GetMigrationState(migrationKey, th.App.Srv.Store)
assert.Nil(t, err)
assert.Equal(t, j3.Id, job.Id)
assert.Equal(t, "unscheduled", state)
}
This diff is collapsed.
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package migrations
import (
"time"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/store"
)
const (
MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS = 3600000 // 1 hour
)
type Scheduler struct {
App *app.App
allMigrationsCompleted bool
}
func (m *MigrationsJobInterfaceImpl) MakeScheduler() model.Scheduler {
return &Scheduler{m.App, false}
}
func (scheduler *Scheduler) Name() string {
return "MigrationsScheduler"
}
func (scheduler *Scheduler) JobType() string {
return model.JOB_TYPE_MIGRATIONS
}
func (scheduler *Scheduler) Enabled(cfg *model.Config) bool {
return true
}
func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
if scheduler.allMigrationsCompleted {
return nil
}
nextTime := time.Now().Add(60 * time.Second)
return &nextTime
}
func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name()))
// Work through the list of migrations in order. Schedule the first one that isn't done (assuming it isn't in progress already).
for _, key := range MakeMigrationsList() {
state, job, err := GetMigrationState(key, scheduler.App.Srv.Store)
if err != nil {
mlog.Error("Failed to determine status of migration: ", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key), mlog.String("error", err.Error()))
return nil, nil
}
if state == MIGRATION_STATE_IN_PROGRESS {
// Check the migration job isn't wedged.
if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS {
mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key))
if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil {
mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
return scheduler.createJob(key, job, scheduler.App.Srv.Store)
}
return nil, nil
}
if state == MIGRATION_STATE_COMPLETED {
// This migration is done. Continue to check the next.
continue
}
if state == MIGRATION_STATE_UNSCHEDULED {
mlog.Debug("Scheduling a new job for migration.", mlog.String("scheduler", scheduler.Name()), mlog.String("migration_key", key))
return scheduler.createJob(key, job, scheduler.App.Srv.Store)
}
mlog.Error("Unknown migration state. Not doing anything.", mlog.String("migration_state", state))
return nil, nil
}
// If we reached here, then there aren't any migrations left to run.
scheduler.allMigrationsCompleted = true
mlog.Debug("All migrations are complete.", mlog.String("scheduler", scheduler.Name()))
return nil, nil
}
func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, store store.Store) (*model.Job, *model.AppError) {
var lastDone string
if lastJob != nil {
lastDone = lastJob.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE]
}
data := map[string]string{
JOB_DATA_KEY_MIGRATION: migrationKey,
JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone,
}
if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil {
return nil, err
} else {
return job, nil
}
}
// Copyright (c) 2018-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package migrations
import (
"context"
"net/http"
"time"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
)
const (
TIME_BETWEEN_BATCHES = 100
)
type Worker struct {
name string
stop chan bool
stopped chan bool
jobs chan model.Job