Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ require (
filippo.io/edwards25519 v1.2.0 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/clipperhouse/uax29/v2 v2.7.0 // indirect
github.com/fasthttp/websocket v1.5.3 // indirect
github.com/gabriel-vasile/mimetype v1.4.13 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-sql-driver/mysql v1.9.3 // indirect
github.com/gofiber/websocket/v2 v2.2.1 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
Expand All @@ -49,6 +51,7 @@ require (
github.com/mfridman/interpolate v0.0.2 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.69.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek=
github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.12 h1:e9hWvmLYvtp846tLHam2o++qitpguFiYCKbn0w9jyqw=
github.com/gabriel-vasile/mimetype v1.4.12/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s=
Expand All @@ -48,6 +50,8 @@ github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI6
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofiber/fiber/v2 v2.52.12 h1:0LdToKclcPOj8PktUdIKo9BUohjjwfnQl42Dhw8/WUw=
github.com/gofiber/fiber/v2 v2.52.12/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w=
github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU=
github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI=
github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA=
Expand Down Expand Up @@ -144,6 +148,8 @@ github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
101 changes: 101 additions & 0 deletions infrastructure/collaboration/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package collaboration

import (
"strings"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
"github.com/labbs/nexo/application/session"
"github.com/labbs/nexo/application/session/dto"
"github.com/rs/zerolog"
)

const wsCollabPrefix = "/ws/collab/"

// Handler manages WebSocket connections for Y.js collaboration.
type Handler struct {
hub *Hub
sessionApp *session.SessionApplication
logger zerolog.Logger
}

// NewHandler creates a new collaboration WebSocket handler.
func NewHandler(hub *Hub, sessionApp *session.SessionApplication, logger zerolog.Logger) *Handler {
return &Handler{
hub: hub,
sessionApp: sessionApp,
logger: logger.With().Str("component", "collaboration.handler").Logger(),
}
}

// UpgradeMiddleware checks for WebSocket upgrade and validates the JWT token
// before upgrading the connection.
func (h *Handler) UpgradeMiddleware() fiber.Handler {
return func(c *fiber.Ctx) error {
if !websocket.IsWebSocketUpgrade(c) {
return fiber.ErrUpgradeRequired
}

h.logger.Debug().Str("event", "ws_upgrade").Str("path", c.Path()).Msg("upgrading to WebSocket")

token := c.Query("token")
if token == "" {
return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "missing token"})
}

result, err := h.sessionApp.ValidateToken(dto.ValidateTokenInput{Token: token})
if err != nil {
h.logger.Warn().Err(err).Msg("invalid token on websocket upgrade")
return c.Status(fiber.StatusUnauthorized).JSON(fiber.Map{"error": "invalid token"})
}

// Store auth context and path in locals for the WebSocket handler
c.Locals("user_id", result.AuthContext.UserID)
c.Locals("path", c.Path())

return c.Next()
}
}

// WebSocketHandler returns the Fiber WebSocket handler for collaboration.
func (h *Handler) WebSocketHandler() fiber.Handler {
return websocket.New(func(c *websocket.Conn) {
// Extract room ID from path: /ws/collab/<roomId>
roomID := strings.TrimPrefix(c.Locals("path").(string), wsCollabPrefix)
userID, _ := c.Locals("user_id").(string)

h.logger.Debug().Str("event", "ws_connection").Str("room_id", roomID).Str("user_id", userID).Msg("new WebSocket connection")

if roomID == "" {
h.logger.Warn().Msg("empty room id")
return
}

room := h.hub.GetOrCreateRoom(roomID)
client := &Client{
UserID: userID,
}

room.AddClient(c, client)
defer func() {
room.RemoveClient(c)
h.hub.RemoveRoomIfEmpty(roomID)
}()

// Read loop: relay all binary messages to other clients in the room
for {
messageType, msg, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
h.logger.Warn().Err(err).Str("room_id", roomID).Str("user_id", userID).Msg("unexpected close")
}
break
}

// Only relay binary messages (Y.js protocol)
if messageType == websocket.BinaryMessage {
room.Broadcast(c, msg)
}
}
})
}
73 changes: 73 additions & 0 deletions infrastructure/collaboration/hub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package collaboration

import (
"sync"

"github.com/rs/zerolog"
)

// Hub manages all collaboration rooms.
type Hub struct {
mu sync.RWMutex
rooms map[string]*Room
logger zerolog.Logger
}

// NewHub creates a new collaboration hub.
func NewHub(logger zerolog.Logger) *Hub {
return &Hub{
rooms: make(map[string]*Room),
logger: logger.With().Str("component", "collaboration.hub").Logger(),
}
}

// GetOrCreateRoom returns an existing room or creates a new one.
func (h *Hub) GetOrCreateRoom(roomID string) *Room {
h.mu.RLock()
room, ok := h.rooms[roomID]
h.mu.RUnlock()
if ok {
return room
}

h.mu.Lock()
defer h.mu.Unlock()

// Double-check after acquiring write lock
if room, ok = h.rooms[roomID]; ok {
return room
}

room = newRoom(roomID, h.logger)
h.rooms[roomID] = room
h.logger.Info().Str("room_id", roomID).Msg("room created")
return room
}

// RemoveRoomIfEmpty removes a room if it has no more clients.
func (h *Hub) RemoveRoomIfEmpty(roomID string) {
h.mu.Lock()
defer h.mu.Unlock()

room, ok := h.rooms[roomID]
if !ok {
return
}

if room.ClientCount() == 0 {
delete(h.rooms, roomID)
h.logger.Info().Str("room_id", roomID).Msg("room removed (empty)")
}
}

// Stats returns the number of active rooms and total clients.
func (h *Hub) Stats() (rooms int, clients int) {
h.mu.RLock()
defer h.mu.RUnlock()

rooms = len(h.rooms)
for _, r := range h.rooms {
clients += r.ClientCount()
}
return
}
88 changes: 88 additions & 0 deletions infrastructure/collaboration/room.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package collaboration

import (
"sync"

"github.com/gofiber/websocket/v2"
"github.com/rs/zerolog"
)

// Room represents a Y.js collaboration room.
// It acts as a pure relay: binary messages from one client are broadcast to all others.
type Room struct {
id string
mu sync.RWMutex
clients map[*websocket.Conn]*Client
logger zerolog.Logger
}

// Client holds metadata about a connected user.
type Client struct {
UserID string
Username string
writeMu sync.Mutex
}

func newRoom(id string, logger zerolog.Logger) *Room {
return &Room{
id: id,
clients: make(map[*websocket.Conn]*Client),
logger: logger.With().Str("component", "collaboration.room").Str("room_id", id).Logger(),
}
}

// AddClient registers a new WebSocket connection in the room.
func (r *Room) AddClient(conn *websocket.Conn, client *Client) {
r.mu.Lock()
defer r.mu.Unlock()
r.clients[conn] = client
r.logger.Info().Str("user_id", client.UserID).Int("clients", len(r.clients)).Msg("client joined")
}

// RemoveClient unregisters a WebSocket connection from the room.
func (r *Room) RemoveClient(conn *websocket.Conn) {
r.mu.Lock()
client, ok := r.clients[conn]
if ok {
delete(r.clients, conn)
}
count := len(r.clients)
r.mu.Unlock()

if ok {
r.logger.Info().Str("user_id", client.UserID).Int("clients", count).Msg("client left")
}
}

// Broadcast sends a binary message to all clients except the sender.
func (r *Room) Broadcast(sender *websocket.Conn, msg []byte) {
// Snapshot targets under read lock to avoid holding the lock during IO.
r.mu.RLock()
type target struct {
conn *websocket.Conn
client *Client
}
targets := make([]target, 0, len(r.clients))
for conn, client := range r.clients {
if conn != sender {
targets = append(targets, target{conn, client})
}
}
r.mu.RUnlock()

for _, t := range targets {
t.client.writeMu.Lock()
err := t.conn.WriteMessage(websocket.BinaryMessage, msg)
t.client.writeMu.Unlock()
if err != nil {
r.logger.Warn().Err(err).Msg("failed to write to client")
}
}
}

// ClientCount returns the number of connected clients.
func (r *Room) ClientCount() int {
r.mu.RLock()
defer r.mu.RUnlock()
return len(r.clients)
}
3 changes: 3 additions & 0 deletions infrastructure/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/labbs/nexo/application/user"
"github.com/labbs/nexo/application/webhook"
"github.com/labbs/nexo/domain"
"github.com/labbs/nexo/infrastructure/collaboration"
"github.com/labbs/nexo/infrastructure/config"
"github.com/labbs/nexo/infrastructure/cronscheduler"
"github.com/labbs/nexo/infrastructure/database"
Expand Down Expand Up @@ -43,4 +44,6 @@ type Deps struct {
FavoriteApplication *favorite.FavoriteApplication
PermissionApplication *permission.PermissionApplication
PermissionPers domain.PermissionPers

CollaborationHub *collaboration.Hub
}
5 changes: 5 additions & 0 deletions infrastructure/static/static.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func NewStatic(f *fiber.App) {
return c.Next()
}

// Skip WebSocket routes
if strings.HasPrefix(path, "/ws") {
return c.Next()
}

// Serve index.html from the embedded FS for all other routes (SPA routes)
indexFile, err := embedDirStatic.ReadFile("files/index.html")
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions interfaces/cli/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/labbs/nexo/application/user"
"github.com/labbs/nexo/application/webhook"
"github.com/labbs/nexo/infrastructure"
"github.com/labbs/nexo/infrastructure/collaboration"
"github.com/labbs/nexo/infrastructure/config"
"github.com/labbs/nexo/infrastructure/cronscheduler"
"github.com/labbs/nexo/infrastructure/database"
Expand Down Expand Up @@ -145,6 +146,9 @@ func runServer(cfg config.Config) error {
deps.SessionApplication.DatabaseApplication = deps.DatabaseApplication
deps.SessionApplication.DrawingApplication = deps.DrawingApplication

// Initialize collaboration hub
deps.CollaborationHub = collaboration.NewHub(deps.Logger)

// Initialize HTTP server (fiber + fiberoapi)
deps.Http, err = http.Configure(deps.Config, deps.Logger, deps.SessionApplication, true)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions interfaces/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"github.com/labbs/nexo/infrastructure"
"github.com/labbs/nexo/infrastructure/collaboration"
v1 "github.com/labbs/nexo/interfaces/http/v1"
)

Expand All @@ -14,4 +15,21 @@ func SetupRoutes(deps infrastructure.Deps) {

// Setup v1 routes
v1.SetupRouterV1(deps)

// Setup WebSocket collaboration route
setupCollaborationRoutes(deps)
}

func setupCollaborationRoutes(deps infrastructure.Deps) {
logger := deps.Logger.With().Str("component", "http.router.collaboration").Logger()
logger.Info().Str("event", "setup_collaboration_routes").Msg("Setting up collaboration WebSocket routes")
handler := collaboration.NewHandler(deps.CollaborationHub, deps.SessionApplication, deps.Logger)

// The frontend connects to ws://<host>/<roomId>?token=<jwt>
// Room formats: "document:<docId>" or "row:<databaseId>:<rowId>"
// Use("/ws/collab") is a prefix match, Get uses wildcard for the room ID (contains colons)
deps.Http.Fiber.Use("/ws/collab", handler.UpgradeMiddleware())
deps.Http.Fiber.Get("/ws/collab/+", handler.WebSocketHandler())

logger.Debug().Interface("paths", deps.Http.Fiber.GetRoutes()).Msg("Registered HTTP routes")
}
Loading