Commit 6c826e76 authored by Jesse Hallam's avatar Jesse Hallam Committed by Christopher Speller
Browse files

materialize PublicChannels without triggers (#9424)

Creating triggers requires SUPERUSER privileges, and is especially
painful on RDS. Pivot to maintaining this denormalized table in code.
parent 6c2a5555
......@@ -4914,10 +4914,34 @@
"id": "store.sql_channel.permanent_delete.app_error",
"translation": "Unable to delete the channel"
},
{
"id": "store.sql_channel.permanent_delete.commit_transaction.app_error",
"translation": "Unable to commit transaction"
},
{
"id": "store.sql_channel.permanent_delete.delete_public_channel.app_error",
"translation": "Unable to delete materialized public channel"
},
{
"id": "store.sql_channel.permanent_delete.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.permanent_delete_by_team.app_error",
"translation": "Unable to delete the channels"
},
{
"id": "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error",
"translation": "Unable to commit transaction"
},
{
"id": "store.sql_channel.permanent_delete_by_team.delete_public_channels.app_error",
"translation": "Unable to delete materialized public channels"
},
{
"id": "store.sql_channel.permanent_delete_by_team.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.permanent_delete_members_by_user.app_error",
"translation": "Unable to remove the channel member"
......@@ -4934,10 +4958,26 @@
"id": "store.sql_channel.reset_all_channel_schemes.app_error",
"translation": "We could not reset the channel schemes"
},
{
"id": "store.sql_channel.reset_all_channel_schemes.commit_transaction.app_error",
"translation": "Unable to commit transaction"
},
{
"id": "store.sql_channel.reset_all_channel_schemes.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.reset_last_post_at.app_error",
"translation": "We could not reset the channel last post at date"
},
{
"id": "store.sql_channel.reset_last_post_at.commit_transaction.app_error",
"translation": "Unable to commit transaction"
},
{
"id": "store.sql_channel.reset_last_post_at.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.save.archived_channel.app_error",
"translation": "You can not modify an archived channel"
......@@ -4954,6 +4994,10 @@
"id": "store.sql_channel.save.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.save.upsert_public_channel.app_error",
"translation": "Unable to upsert materialized public channel"
},
{
"id": "store.sql_channel.save_channel.current_count.app_error",
"translation": "Failed to get current channel count"
......@@ -5014,6 +5058,18 @@
"id": "store.sql_channel.search.app_error",
"translation": "We encountered an error searching channels"
},
{
"id": "store.sql_channel.set_delete_at.commit_transaction.app_error",
"translation": "Unable to commit transaction"
},
{
"id": "store.sql_channel.set_delete_at.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.set_delete_at.update_public_channel.app_error",
"translation": "Unable to update the materialized public channel"
},
{
"id": "store.sql_channel.update.app_error",
"translation": "Unable to update the channel"
......@@ -5022,10 +5078,18 @@
"id": "store.sql_channel.update.archived_channel.app_error",
"translation": "You can not modify an archived channel"
},
{
"id": "store.sql_channel.update.commit_transaction.app_error",
"translation": "Unable to commit transaction"
},
{
"id": "store.sql_channel.update.exists.app_error",
"translation": "A channel with that handle already exists"
},
{
"id": "store.sql_channel.update.open_transaction.app_error",
"translation": "Unable to open transaction"
},
{
"id": "store.sql_channel.update.previously.app_error",
"translation": "A channel with that handle was previously created"
......@@ -5034,6 +5098,10 @@
"id": "store.sql_channel.update.updating.app_error",
"translation": "We encountered an error updating the channel"
},
{
"id": "store.sql_channel.update.upsert_public_channel.app_error",
"translation": "Unable to upsert materialized public channel"
},
{
"id": "store.sql_channel.update_last_viewed_at.app_error",
"translation": "Unable to update the last viewed at time"
......
......@@ -301,11 +301,6 @@ func (s SqlChannelStore) CreateIndexesIfNotExists() {
s.CreateFullTextIndexIfNotExists("idx_channel_search_txt", "Channels", "Name, DisplayName, Purpose")
}
func (s SqlChannelStore) CreateTriggersIfNotExists() error {
// See SqlChannelStoreExperimental
return nil
}
func (s SqlChannelStore) MigratePublicChannels() error {
// See SqlChannelStoreExperimental
return nil
......@@ -316,6 +311,9 @@ func (s SqlChannelStore) DropPublicChannels() error {
return nil
}
// Save writes the (non-direct) channel channel to the database.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Save(channel *model.Channel, maxChannelsPerTeam int64) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if channel.DeleteAt != 0 {
......@@ -474,42 +472,68 @@ func (s SqlChannelStore) saveChannelT(transaction *gorp.Transaction, channel *mo
return result
}
// Update writes the updated channel to the database.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Update(channel *model.Channel) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
channel.PreUpdate()
if channel.DeleteAt != 0 {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.archived_channel.app_error", nil, "", http.StatusBadRequest)
transaction, err := s.GetMaster().Begin()
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
if result.Err = channel.IsValid(); result.Err != nil {
*result = s.updateChannelT(transaction, channel)
if result.Err != nil {
transaction.Rollback()
return
}
count, err := s.GetMaster().Update(channel)
if err != nil {
if IsUniqueConstraintError(err, []string{"Name", "channels_name_teamid_key"}) {
dupChannel := model.Channel{}
s.GetReplica().SelectOne(&dupChannel, "SELECT * FROM Channels WHERE TeamId = :TeamId AND Name= :Name AND DeleteAt > 0", map[string]interface{}{"TeamId": channel.TeamId, "Name": channel.Name})
if dupChannel.DeleteAt > 0 {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.previously.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
return
}
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.exists.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
return
}
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.updating.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusInternalServerError)
if err := transaction.Commit(); err != nil {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
})
}
if count != 1 {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.app_error", nil, "id="+channel.Id, http.StatusInternalServerError)
return
func (s SqlChannelStore) updateChannelT(transaction *gorp.Transaction, channel *model.Channel) store.StoreResult {
result := store.StoreResult{}
channel.PreUpdate()
if channel.DeleteAt != 0 {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.archived_channel.app_error", nil, "", http.StatusBadRequest)
return result
}
if result.Err = channel.IsValid(); result.Err != nil {
return result
}
count, err := transaction.Update(channel)
if err != nil {
if IsUniqueConstraintError(err, []string{"Name", "channels_name_teamid_key"}) {
dupChannel := model.Channel{}
s.GetReplica().SelectOne(&dupChannel, "SELECT * FROM Channels WHERE TeamId = :TeamId AND Name= :Name AND DeleteAt > 0", map[string]interface{}{"TeamId": channel.TeamId, "Name": channel.Name})
if dupChannel.DeleteAt > 0 {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.previously.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
return result
}
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.exists.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusBadRequest)
return result
}
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.updating.app_error", nil, "id="+channel.Id+", "+err.Error(), http.StatusInternalServerError)
return result
}
result.Data = channel
})
if count != 1 {
result.Err = model.NewAppError("SqlChannelStore.Update", "store.sql_channel.update.app_error", nil, "id="+channel.Id, http.StatusInternalServerError)
return result
}
result.Data = channel
return result
}
func (s SqlChannelStore) GetChannelUnread(channelId, userId string) store.StoreChannel {
......@@ -617,39 +641,126 @@ func (s SqlChannelStore) get(id string, master bool, allowFromCache bool) store.
})
}
// Delete records the given deleted timestamp to the channel in question.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Delete(channelId string, time int64) store.StoreChannel {
return s.SetDeleteAt(channelId, time, time)
}
// Restore reverts a previous deleted timestamp from the channel in question.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) Restore(channelId string, time int64) store.StoreChannel {
return s.SetDeleteAt(channelId, 0, time)
}
func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt int64, updateAt int64) store.StoreChannel {
// SetDeleteAt records the given deleted and updated timestamp to the channel in question.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) SetDeleteAt(channelId string, deleteAt, updateAt int64) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
_, err := s.GetMaster().Exec("Update Channels SET DeleteAt = :DeleteAt, UpdateAt = :UpdateAt WHERE Id = :ChannelId", map[string]interface{}{"DeleteAt": deleteAt, "UpdateAt": updateAt, "ChannelId": channelId})
transaction, err := s.GetMaster().Begin()
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.Delete", "store.sql_channel.delete.channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError)
result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
*result = s.setDeleteAtT(transaction, channelId, deleteAt, updateAt)
if result.Err != nil {
transaction.Rollback()
return
}
if err := transaction.Commit(); err != nil {
result.Err = model.NewAppError("SqlChannelStore.SetDeleteAt", "store.sql_channel.set_delete_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
})
}
func (s SqlChannelStore) setDeleteAtT(transaction *gorp.Transaction, channelId string, deleteAt, updateAt int64) store.StoreResult {
result := store.StoreResult{}
_, err := transaction.Exec("Update Channels SET DeleteAt = :DeleteAt, UpdateAt = :UpdateAt WHERE Id = :ChannelId", map[string]interface{}{"DeleteAt": deleteAt, "UpdateAt": updateAt, "ChannelId": channelId})
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.Delete", "store.sql_channel.delete.channel.app_error", nil, "id="+channelId+", err="+err.Error(), http.StatusInternalServerError)
return result
}
return result
}
// PermanentDeleteByTeam removes all channels for the given team from the database.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) PermanentDeleteByTeam(teamId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if _, err := s.GetMaster().Exec("DELETE FROM Channels WHERE TeamId = :TeamId", map[string]interface{}{"TeamId": teamId}); err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.app_error", nil, "teamId="+teamId+", "+err.Error(), http.StatusInternalServerError)
transaction, err := s.GetMaster().Begin()
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
*result = s.permanentDeleteByTeamtT(transaction, teamId)
if result.Err != nil {
transaction.Rollback()
return
}
if err := transaction.Commit(); err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
})
}
func (s SqlChannelStore) permanentDeleteByTeamtT(transaction *gorp.Transaction, teamId string) store.StoreResult {
result := store.StoreResult{}
if _, err := transaction.Exec("DELETE FROM Channels WHERE TeamId = :TeamId", map[string]interface{}{"TeamId": teamId}); err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDeleteByTeam", "store.sql_channel.permanent_delete_by_team.app_error", nil, "teamId="+teamId+", "+err.Error(), http.StatusInternalServerError)
return result
}
return result
}
// PermanentDelete removes the given channel from the database.
//
// @see ChannelStoreExperimental for how this update propagates to the PublicChannels table.
func (s SqlChannelStore) PermanentDelete(channelId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if _, err := s.GetMaster().Exec("DELETE FROM Channels WHERE Id = :ChannelId", map[string]interface{}{"ChannelId": channelId}); err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError)
transaction, err := s.GetMaster().Begin()
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
*result = s.permanentDeleteT(transaction, channelId)
if result.Err != nil {
transaction.Rollback()
return
}
if err := transaction.Commit(); err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
})
}
func (s SqlChannelStore) permanentDeleteT(transaction *gorp.Transaction, channelId string) store.StoreResult {
result := store.StoreResult{}
if _, err := transaction.Exec("DELETE FROM Channels WHERE Id = :ChannelId", map[string]interface{}{"ChannelId": channelId}); err != nil {
result.Err = model.NewAppError("SqlChannelStore.PermanentDelete", "store.sql_channel.permanent_delete.app_error", nil, "channel_id="+channelId+", "+err.Error(), http.StatusInternalServerError)
return result
}
return result
}
func (s SqlChannelStore) PermanentDeleteMembersByChannel(channelId string) store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
_, err := s.GetMaster().Exec("DELETE FROM ChannelMembers WHERE ChannelId = :ChannelId", map[string]interface{}{"ChannelId": channelId})
......@@ -1914,12 +2025,36 @@ func (s SqlChannelStore) MigrateChannelMembers(fromChannelId string, fromUserId
func (s SqlChannelStore) ResetAllChannelSchemes() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
if _, err := s.GetMaster().Exec("UPDATE Channels SET SchemeId=''"); err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.app_error", nil, err.Error(), http.StatusInternalServerError)
transaction, err := s.GetMaster().Begin()
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
*result = s.resetAllChannelSchemesT(transaction)
if result.Err != nil {
transaction.Rollback()
return
}
if err := transaction.Commit(); err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
})
}
func (s SqlChannelStore) resetAllChannelSchemesT(transaction *gorp.Transaction) store.StoreResult {
result := store.StoreResult{}
if _, err := transaction.Exec("UPDATE Channels SET SchemeId=''"); err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetAllChannelSchemes", "store.sql_channel.reset_all_channel_schemes.app_error", nil, err.Error(), http.StatusInternalServerError)
return result
}
return result
}
func (s SqlChannelStore) ClearAllCustomRoleAssignments() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
builtInRoles := model.MakeDefaultRoles()
......@@ -1991,19 +2126,43 @@ func (s SqlChannelStore) ClearAllCustomRoleAssignments() store.StoreChannel {
func (s SqlChannelStore) ResetLastPostAt() store.StoreChannel {
return store.Do(func(result *store.StoreResult) {
var query string
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
query = "UPDATE Channels SET LastPostAt = COALESCE((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
} else {
query = "UPDATE Channels SET LastPostAt = IFNULL((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
transaction, err := s.GetMaster().Begin()
if err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.open_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
*result = s.resetLastPostAtT(transaction)
if result.Err != nil {
transaction.Rollback()
return
}
if _, err := s.GetMaster().Exec(query); err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.app_error", nil, err.Error(), http.StatusInternalServerError)
if err := transaction.Commit(); err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.commit_transaction.app_error", nil, err.Error(), http.StatusInternalServerError)
return
}
})
}
func (s SqlChannelStore) resetLastPostAtT(transaction *gorp.Transaction) store.StoreResult {
result := store.StoreResult{}
var query string
if s.DriverName() == model.DATABASE_DRIVER_POSTGRES {
query = "UPDATE Channels SET LastPostAt = COALESCE((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
} else {
query = "UPDATE Channels SET LastPostAt = IFNULL((SELECT UpdateAt FROM Posts WHERE ChannelId = Channels.Id ORDER BY UpdateAt DESC LIMIT 1), Channels.CreateAt);"
}
if _, err := transaction.Exec(query); err != nil {
result.Err = model.NewAppError("SqlChannelStore.ResetLastPostAt", "store.sql_channel.reset_last_post_at.app_error", nil, err.Error(), http.StatusInternalServerError)
return result
}
return result
}
func (s SqlChannelStore) EnableExperimentalPublicChannelsMaterialization() {
// See SqlChannelStoreExperimental
}
......
......@@ -157,15 +157,6 @@ func NewSqlSupplier(settings model.SqlSettings, metrics einterfaces.MetricsInter
os.Exit(EXIT_CREATE_TABLE)
}
// This store's triggers should exist before the migration is run to ensure the
// corresponding tables stay in sync. Whether or not a trigger should be created before
// or after a migration is likely to be decided on a case-by-case basis.
if err := supplier.oldStores.channel.(*SqlChannelStoreExperimental).CreateTriggersIfNotExists(); err != nil {
mlog.Critical("Error creating triggers", mlog.Err(err))
time.Sleep(time.Second)
os.Exit(EXIT_GENERIC_FAILURE)
}
UpgradeDatabase(supplier)
supplier.oldStores.team.(*SqlTeamStore).CreateIndexesIfNotExists()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment