Commit ad343a0f authored by Joram Wilander's avatar Joram Wilander Committed by GitHub

Added infrastructure for basic WebSocket API (#3432)

parent 06eacf30
......@@ -48,6 +48,8 @@ type Routes struct {
Public *mux.Router // 'api/v3/public'
Emoji *mux.Router // 'api/v3/emoji'
WebSocket *WebSocketRouter // websocket api
}
var BaseRoutes *Routes
......@@ -76,6 +78,8 @@ func InitApi() {
BaseRoutes.Public = BaseRoutes.ApiRoot.PathPrefix("/public").Subrouter()
BaseRoutes.Emoji = BaseRoutes.ApiRoot.PathPrefix("/emoji").Subrouter()
BaseRoutes.WebSocket = NewWebSocketRouter()
InitUser()
InitTeam()
InitChannel()
......
......@@ -103,6 +103,10 @@ func (me *TestHelper) CreateClient() *model.Client {
return model.NewClient("http://localhost" + utils.Cfg.ServiceSettings.ListenAddress)
}
func (me *TestHelper) CreateWebSocketClient() (*model.WebSocketClient, *model.AppError) {
return model.NewWebSocketClient("ws://localhost"+utils.Cfg.ServiceSettings.ListenAddress, me.BasicClient.AuthToken)
}
func (me *TestHelper) CreateTeam(client *model.Client) *model.Team {
id := model.NewId()
team := &model.Team{
......
......@@ -158,7 +158,7 @@ func CreateDirectChannel(userId string, otherUserId string) (*model.Channel, *mo
return nil, result.Err
}
} else {
message := model.NewMessage("", channel.Id, userId, model.ACTION_DIRECT_ADDED)
message := model.NewWebSocketEvent("", channel.Id, userId, model.WEBSOCKET_EVENT_DIRECT_ADDED)
message.Add("teammate_id", otherUserId)
go Publish(message)
......@@ -587,7 +587,7 @@ func AddUserToChannel(user *model.User, channel *model.Channel) (*model.ChannelM
go func() {
InvalidateCacheForUser(user.Id)
message := model.NewMessage(channel.TeamId, channel.Id, user.Id, model.ACTION_USER_ADDED)
message := model.NewWebSocketEvent(channel.TeamId, channel.Id, user.Id, model.WEBSOCKET_EVENT_USER_ADDED)
go Publish(message)
}()
......@@ -772,7 +772,7 @@ func deleteChannel(c *Context, w http.ResponseWriter, r *http.Request) {
go func() {
InvalidateCacheForChannel(channel.Id)
message := model.NewMessage(c.TeamId, channel.Id, c.Session.UserId, model.ACTION_CHANNEL_DELETED)
message := model.NewWebSocketEvent(c.TeamId, channel.Id, c.Session.UserId, model.WEBSOCKET_EVENT_CHANNEL_DELETED)
go Publish(message)
post := &model.Post{
......@@ -806,7 +806,7 @@ func updateLastViewedAt(c *Context, w http.ResponseWriter, r *http.Request) {
Srv.Store.Preference().Save(&model.Preferences{preference})
message := model.NewMessage(c.TeamId, id, c.Session.UserId, model.ACTION_CHANNEL_VIEWED)
message := model.NewWebSocketEvent(c.TeamId, id, c.Session.UserId, model.WEBSOCKET_EVENT_CHANNEL_VIEWED)
message.Add("channel_id", id)
go Publish(message)
......@@ -1032,7 +1032,7 @@ func RemoveUserFromChannel(userIdToRemove string, removerUserId string, channel
InvalidateCacheForUser(userIdToRemove)
message := model.NewMessage(channel.TeamId, channel.Id, userIdToRemove, model.ACTION_USER_REMOVED)
message := model.NewWebSocketEvent(channel.TeamId, channel.Id, userIdToRemove, model.WEBSOCKET_EVENT_USER_REMOVED)
message.Add("remover_id", removerUserId)
go Publish(message)
......
......@@ -69,7 +69,7 @@ func setCollapsePreference(c *Context, value string) *model.CommandResponse {
return &model.CommandResponse{Text: c.T("api.command_expand_collapse.fail.app_error"), ResponseType: model.COMMAND_RESPONSE_TYPE_EPHEMERAL}
}
socketMessage := model.NewMessage("", "", c.Session.UserId, model.ACTION_PREFERENCE_CHANGED)
socketMessage := model.NewWebSocketEvent("", "", c.Session.UserId, model.WEBSOCKET_EVENT_PREFERENCE_CHANGED)
socketMessage.Add("preference", pref.ToJson())
go Publish(socketMessage)
......
......@@ -21,6 +21,7 @@ func InitGeneral() {
BaseRoutes.General.Handle("/log_client", ApiAppHandler(logClient)).Methods("POST")
BaseRoutes.General.Handle("/ping", ApiAppHandler(ping)).Methods("GET")
BaseRoutes.WebSocket.Handle("ping", ApiWebSocketHandler(webSocketPing))
}
func getClientConfig(c *Context, w http.ResponseWriter, r *http.Request) {
......@@ -71,3 +72,8 @@ func ping(c *Context, w http.ResponseWriter, r *http.Request) {
m["node_id"] = ""
w.Write([]byte(model.MapToJson(m)))
}
func webSocketPing(req *model.WebSocketRequest, responseData map[string]interface{}) *model.AppError {
responseData["text"] = "pong"
return nil
}
......@@ -329,7 +329,7 @@ func makeDirectChannelVisible(teamId string, channelId string) {
if saveResult := <-Srv.Store.Preference().Save(&model.Preferences{*preference}); saveResult.Err != nil {
l4g.Error(utils.T("api.post.make_direct_channel_visible.save_pref.error"), member.UserId, otherUserId, saveResult.Err.Message)
} else {
message := model.NewMessage(teamId, channelId, member.UserId, model.ACTION_PREFERENCE_CHANGED)
message := model.NewWebSocketEvent(teamId, channelId, member.UserId, model.WEBSOCKET_EVENT_PREFERENCE_CHANGED)
message.Add("preference", preference.ToJson())
go Publish(message)
......@@ -344,7 +344,7 @@ func makeDirectChannelVisible(teamId string, channelId string) {
if updateResult := <-Srv.Store.Preference().Save(&model.Preferences{preference}); updateResult.Err != nil {
l4g.Error(utils.T("api.post.make_direct_channel_visible.update_pref.error"), member.UserId, otherUserId, updateResult.Err.Message)
} else {
message := model.NewMessage(teamId, channelId, member.UserId, model.ACTION_PREFERENCE_CHANGED)
message := model.NewWebSocketEvent(teamId, channelId, member.UserId, model.WEBSOCKET_EVENT_PREFERENCE_CHANGED)
message.Add("preference", preference.ToJson())
go Publish(message)
......@@ -627,7 +627,7 @@ func sendNotifications(c *Context, post *model.Post, team *model.Team, channel *
}
}
message := model.NewMessage(c.TeamId, post.ChannelId, post.UserId, model.ACTION_POSTED)
message := model.NewWebSocketEvent(c.TeamId, post.ChannelId, post.UserId, model.WEBSOCKET_EVENT_POSTED)
message.Add("post", post.ToJson())
message.Add("channel_type", channel.Type)
message.Add("channel_display_name", channel.DisplayName)
......@@ -905,7 +905,7 @@ func SendEphemeralPost(teamId, userId string, post *model.Post) {
post.Filenames = []string{}
}
message := model.NewMessage(teamId, post.ChannelId, userId, model.ACTION_EPHEMERAL_MESSAGE)
message := model.NewWebSocketEvent(teamId, post.ChannelId, userId, model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE)
message.Add("post", post.ToJson())
go Publish(message)
......@@ -967,7 +967,7 @@ func updatePost(c *Context, w http.ResponseWriter, r *http.Request) {
} else {
rpost := result.Data.(*model.Post)
message := model.NewMessage(c.TeamId, rpost.ChannelId, c.Session.UserId, model.ACTION_POST_EDITED)
message := model.NewWebSocketEvent(c.TeamId, rpost.ChannelId, c.Session.UserId, model.WEBSOCKET_EVENT_POST_EDITED)
message.Add("post", rpost.ToJson())
go Publish(message)
......@@ -1231,7 +1231,7 @@ func deletePost(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
message := model.NewMessage(c.TeamId, post.ChannelId, c.Session.UserId, model.ACTION_POST_DELETED)
message := model.NewWebSocketEvent(c.TeamId, post.ChannelId, c.Session.UserId, model.WEBSOCKET_EVENT_POST_DELETED)
message.Add("post", post.ToJson())
go Publish(message)
......
......@@ -298,7 +298,7 @@ func JoinUserToTeam(team *model.Team, user *model.User) *model.AppError {
InvalidateCacheForUser(user.Id)
// This message goes to every channel, so the channelId is irrelevant
go Publish(model.NewMessage("", "", user.Id, model.ACTION_NEW_USER))
go Publish(model.NewWebSocketEvent("", "", user.Id, model.WEBSOCKET_EVENT_NEW_USER))
return nil
}
......@@ -348,7 +348,7 @@ func LeaveTeam(team *model.Team, user *model.User) *model.AppError {
RemoveAllSessionsForUserId(user.Id)
InvalidateCacheForUser(user.Id)
go Publish(model.NewMessage(team.Id, "", user.Id, model.ACTION_LEAVE_TEAM))
go Publish(model.NewWebSocketEvent(team.Id, "", user.Id, model.WEBSOCKET_EVENT_LEAVE_TEAM))
return nil
}
......
......@@ -75,6 +75,8 @@ func InitUser() {
BaseRoutes.Root.Handle("/login/sso/saml", AppHandlerIndependent(loginWithSaml)).Methods("GET")
BaseRoutes.Root.Handle("/login/sso/saml", AppHandlerIndependent(completeSaml)).Methods("POST")
BaseRoutes.WebSocket.Handle("user_typing", ApiWebSocketHandler(userTyping))
}
func createUser(c *Context, w http.ResponseWriter, r *http.Request) {
......@@ -269,7 +271,7 @@ func CreateUser(user *model.User) (*model.User, *model.AppError) {
ruser.Sanitize(map[string]bool{})
// This message goes to every channel, so the channelId is irrelevant
go Publish(model.NewMessage("", "", ruser.Id, model.ACTION_NEW_USER))
go Publish(model.NewWebSocketEvent("", "", ruser.Id, model.WEBSOCKET_EVENT_NEW_USER))
return ruser, nil
}
......@@ -2540,3 +2542,22 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, GetProtocol(r)+"://"+r.Host, http.StatusFound)
}
}
func userTyping(req *model.WebSocketRequest, responseData map[string]interface{}) *model.AppError {
var ok bool
var channelId string
if channelId, ok = req.Data["channel_id"].(string); !ok || len(channelId) != 26 {
return NewInvalidWebSocketParamError(req.Action, "channel_id")
}
var parentId string
if parentId, ok = req.Data["parent_id"].(string); !ok {
parentId = ""
}
event := model.NewWebSocketEvent("", channelId, req.Session.UserId, model.WEBSOCKET_EVENT_TYPING)
event.Add("parent_id", parentId)
go Publish(event)
return nil
}
......@@ -1719,3 +1719,85 @@ func TestCheckMfa(t *testing.T) {
// need to add more test cases when enterprise bits can be loaded into tests
}
func TestUserTyping(t *testing.T) {
th := Setup().InitBasic()
Client := th.BasicClient
WebSocketClient, err := th.CreateWebSocketClient()
if err != nil {
t.Fatal(err)
}
defer WebSocketClient.Close()
WebSocketClient.Listen()
WebSocketClient.UserTyping("", "")
time.Sleep(300 * time.Millisecond)
if resp := <-WebSocketClient.ResponseChannel; resp.Error.Id != "api.websocket_handler.invalid_param.app_error" {
t.Fatal("should have been invalid param response")
}
th.LoginBasic2()
Client.Must(Client.JoinChannel(th.BasicChannel.Id))
WebSocketClient2, err2 := th.CreateWebSocketClient()
if err2 != nil {
t.Fatal(err2)
}
defer WebSocketClient2.Close()
WebSocketClient2.Listen()
WebSocketClient.UserTyping(th.BasicChannel.Id, "")
time.Sleep(300 * time.Millisecond)
stop := make(chan bool)
eventHit := false
go func() {
for {
select {
case resp := <-WebSocketClient2.EventChannel:
if resp.Event == model.WEBSOCKET_EVENT_TYPING && resp.UserId == th.BasicUser.Id {
eventHit = true
}
case <-stop:
return
}
}
}()
time.Sleep(300 * time.Millisecond)
stop <- true
if !eventHit {
t.Fatal("did not receive typing event")
}
WebSocketClient.UserTyping(th.BasicChannel.Id, "someparentid")
time.Sleep(300 * time.Millisecond)
eventHit = false
go func() {
for {
select {
case resp := <-WebSocketClient2.EventChannel:
if resp.Event == model.WEBSOCKET_EVENT_TYPING && resp.Data["parent_id"] == "someparentid" {
eventHit = true
}
case <-stop:
return
}
}
}()
time.Sleep(300 * time.Millisecond)
stop <- true
if !eventHit {
t.Fatal("did not receive typing event")
}
}
......@@ -6,10 +6,12 @@ package api
import (
"time"
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/websocket"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/websocket"
goi18n "github.com/nicksnyder/go-i18n/i18n"
)
const (
......@@ -22,32 +24,36 @@ const (
type WebConn struct {
WebSocket *websocket.Conn
Send chan *model.Message
Send chan model.WebSocketMessage
SessionToken string
UserId string
T goi18n.TranslateFunc
Locale string
hasPermissionsToChannel map[string]bool
hasPermissionsToTeam map[string]bool
}
func NewWebConn(ws *websocket.Conn, userId string, sessionToken string) *WebConn {
func NewWebConn(c *Context, ws *websocket.Conn) *WebConn {
go func() {
achan := Srv.Store.User().UpdateUserAndSessionActivity(userId, sessionToken, model.GetMillis())
pchan := Srv.Store.User().UpdateLastPingAt(userId, model.GetMillis())
achan := Srv.Store.User().UpdateUserAndSessionActivity(c.Session.UserId, c.Session.Token, model.GetMillis())
pchan := Srv.Store.User().UpdateLastPingAt(c.Session.UserId, model.GetMillis())
if result := <-achan; result.Err != nil {
l4g.Error(utils.T("api.web_conn.new_web_conn.last_activity.error"), userId, sessionToken, result.Err)
l4g.Error(utils.T("api.web_conn.new_web_conn.last_activity.error"), c.Session.UserId, c.Session.Token, result.Err)
}
if result := <-pchan; result.Err != nil {
l4g.Error(utils.T("api.web_conn.new_web_conn.last_ping.error"), userId, result.Err)
l4g.Error(utils.T("api.web_conn.new_web_conn.last_ping.error"), c.Session.UserId, result.Err)
}
}()
return &WebConn{
Send: make(chan *model.Message, 64),
Send: make(chan model.WebSocketMessage, 64),
WebSocket: ws,
UserId: userId,
SessionToken: sessionToken,
UserId: c.Session.UserId,
SessionToken: c.Session.Token,
T: c.T,
Locale: c.Locale,
hasPermissionsToChannel: make(map[string]bool),
hasPermissionsToTeam: make(map[string]bool),
}
......@@ -73,12 +79,11 @@ func (c *WebConn) readPump() {
})
for {
var msg model.Message
if err := c.WebSocket.ReadJSON(&msg); err != nil {
var req model.WebSocketRequest
if err := c.WebSocket.ReadJSON(&req); err != nil {
return
} else {
msg.UserId = c.UserId
go Publish(&msg)
BaseRoutes.WebSocket.ServeWebSocket(c, &req)
}
}
}
......
......@@ -13,7 +13,7 @@ type Hub struct {
connections map[*WebConn]bool
register chan *WebConn
unregister chan *WebConn
broadcast chan *model.Message
broadcast chan *model.WebSocketEvent
stop chan string
invalidateUser chan string
invalidateChannel chan string
......@@ -23,13 +23,13 @@ var hub = &Hub{
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make(map[*WebConn]bool),
broadcast: make(chan *model.Message),
broadcast: make(chan *model.WebSocketEvent),
stop: make(chan string),
invalidateUser: make(chan string),
invalidateChannel: make(chan string),
}
func Publish(message *model.Message) {
func Publish(message *model.WebSocketEvent) {
hub.Broadcast(message)
}
......@@ -49,7 +49,7 @@ func (h *Hub) Unregister(webConn *WebConn) {
h.unregister <- webConn
}
func (h *Hub) Broadcast(message *model.Message) {
func (h *Hub) Broadcast(message *model.WebSocketEvent) {
if message != nil {
h.broadcast <- message
}
......@@ -108,11 +108,10 @@ func (h *Hub) Start() {
}()
}
func shouldSendEvent(webCon *WebConn, msg *model.Message) bool {
func shouldSendEvent(webCon *WebConn, msg *model.WebSocketEvent) bool {
if webCon.UserId == msg.UserId {
// Don't need to tell the user they are typing
if msg.Action == model.ACTION_TYPING {
if msg.Event == model.WEBSOCKET_EVENT_TYPING {
return false
}
......@@ -127,11 +126,11 @@ func shouldSendEvent(webCon *WebConn, msg *model.Message) bool {
}
} else {
// Don't share a user's view or preference events with other users
if msg.Action == model.ACTION_CHANNEL_VIEWED {
if msg.Event == model.WEBSOCKET_EVENT_CHANNEL_VIEWED {
return false
} else if msg.Action == model.ACTION_PREFERENCE_CHANGED {
} else if msg.Event == model.WEBSOCKET_EVENT_PREFERENCE_CHANGED {
return false
} else if msg.Action == model.ACTION_EPHEMERAL_MESSAGE {
} else if msg.Event == model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE {
// For now, ephemeral messages are sent directly to individual users
return false
}
......@@ -146,7 +145,7 @@ func shouldSendEvent(webCon *WebConn, msg *model.Message) bool {
}
// Only report events to users who are in the channel for the event execept deleted events
if len(msg.ChannelId) > 0 && msg.Action != model.ACTION_CHANNEL_DELETED {
if len(msg.ChannelId) > 0 && msg.Event != model.WEBSOCKET_EVENT_CHANNEL_DELETED {
allowed := webCon.HasPermissionsToChannel(msg.ChannelId)
if !allowed {
......
// Copyright (c) 2015 Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package api
import (
"github.com/gorilla/websocket"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
"net/http"
"testing"
"time"
)
func TestSocket(t *testing.T) {
th := Setup().InitBasic()
Client := th.BasicClient
team := th.BasicTeam
channel1 := th.BasicChannel
channel2 := th.CreateChannel(Client, team)
Client.Must(Client.AddChannelMember(channel1.Id, th.BasicUser2.Id))
url := "ws://localhost" + utils.Cfg.ServiceSettings.ListenAddress + model.API_URL_SUFFIX + "/users/websocket"
header1 := http.Header{}
header1.Set(model.HEADER_AUTH, "BEARER "+Client.AuthToken)
c1, _, err := websocket.DefaultDialer.Dial(url, header1)
if err != nil {
t.Fatal(err)
}
th.LoginBasic2()
header2 := http.Header{}
header2.Set(model.HEADER_AUTH, "BEARER "+Client.AuthToken)
c2, _, err := websocket.DefaultDialer.Dial(url, header2)
if err != nil {
t.Fatal(err)
}
time.Sleep(300 * time.Millisecond)
var rmsg model.Message
// Test sending message without a channelId
m := model.NewMessage(team.Id, "", "", model.ACTION_TYPING)
m.Add("RootId", model.NewId())
m.Add("ParentId", model.NewId())
c1.WriteJSON(m)
if err := c2.ReadJSON(&rmsg); err != nil {
t.Fatal(err)
}
t.Log(rmsg.ToJson())
if team.Id != rmsg.TeamId {
t.Fatal("Ids do not match")
}
if m.Props["RootId"] != rmsg.Props["RootId"] {
t.Fatal("Ids do not match")
}
// Test sending messsage to Channel you have access to
m = model.NewMessage(team.Id, channel1.Id, "", model.ACTION_TYPING)
m.Add("RootId", model.NewId())
m.Add("ParentId", model.NewId())
c1.WriteJSON(m)
if err := c2.ReadJSON(&rmsg); err != nil {
t.Fatal(err)
}
if team.Id != rmsg.TeamId {
t.Fatal("Ids do not match")
}
if m.Props["RootId"] != rmsg.Props["RootId"] {
t.Fatal("Ids do not match")
}
// Test sending message to Channel you *do not* have access too
m = model.NewMessage("", channel2.Id, "", model.ACTION_TYPING)
m.Add("RootId", model.NewId())
m.Add("ParentId", model.NewId())
c1.WriteJSON(m)
go func() {
if err := c2.ReadJSON(&rmsg); err != nil {
t.Fatal(err)
}
t.Fatal(err)
}()
time.Sleep(2 * time.Second)
}
......@@ -8,7 +8,6 @@ import (
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
"testing"
"time"
)
func TestCreateIncomingHook(t *testing.T) {
......@@ -629,12 +628,3 @@ func TestIncomingWebhooks(t *testing.T) {
t.Fatal("should have failed - webhooks turned off")
}
}
func TestZZWebSocketTearDown(t *testing.T) {
// *IMPORTANT* - Kind of hacky
// This should be the last function in any test file
// that calls Setup()
// Should be in the last file too sorted by name
time.Sleep(2 * time.Second)
TearDown()
}
......@@ -33,7 +33,7 @@ func connect(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
wc := NewWebConn(ws, c.Session.UserId, c.Session.Token)
wc := NewWebConn(c, ws)
hub.Register(wc)
go wc.writePump()
wc.readPump()
......
// Copyright (c) 2016 Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package api
import (
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/platform/model"
"github.com/mattermost/platform/utils"
)
func ApiWebSocketHandler(wh func(*model.WebSocketRequest, map[string]interface{}) *model.AppError) *webSocketHandler {
return &webSocketHandler{wh}
}
type webSocketHandler struct {
handlerFunc func(*model.WebSocketRequest, map[string]interface{}) *model.AppError
}
func (wh *webSocketHandler) ServeWebSocket(conn *WebConn, r *model.WebSocketRequest) {
l4g.Debug("/api/v3/users/websocket:%s", r.Action)
r.Session = *GetSession(conn.SessionToken)
r.T = conn.T
r.Locale = conn.Locale
data := make(map[string]interface{})
if err := wh.handlerFunc(r, data); err != nil {
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, r.Session.UserId, err.SystemMessage(utils.T), err.DetailedError)
err.DetailedError = ""
conn.Send <- model.NewWebSocketError(r.Seq, err)
return
}
conn.Send <- model.NewWebSocketResponse(model.STATUS_OK, r.Seq, data)
}
func NewInvalidWebSocketParamError(action string, name string) *model.AppError {
return model.NewLocAppError("/api/v3/users/websocket:"+action, "api.websocket_handler.invalid_param.app_error", map[string]interface{}{"Name": name}, "")
}