A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/handlers/oauth" 5 "Coves/internal/api/middleware" 6 "Coves/internal/api/routes" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/communities" 10 "Coves/internal/core/users" 11 "bytes" 12 "context" 13 "database/sql" 14 "encoding/json" 15 "fmt" 16 "io" 17 "log" 18 "net/http" 19 "os" 20 "strings" 21 "time" 22 23 "github.com/go-chi/chi/v5" 24 chiMiddleware "github.com/go-chi/chi/v5/middleware" 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27 28 oauthCore "Coves/internal/core/oauth" 29 30 postgresRepo "Coves/internal/db/postgres" 31) 32 33func main() { 34 // Database configuration (AppView database) 35 dbURL := os.Getenv("DATABASE_URL") 36 if dbURL == "" { 37 // Use dev database from .env.dev 38 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 39 } 40 41 // Default PDS URL for this Coves instance (supports self-hosting) 42 defaultPDS := os.Getenv("PDS_URL") 43 if defaultPDS == "" { 44 defaultPDS = "http://localhost:3001" // Local dev PDS 45 } 46 47 db, err := sql.Open("postgres", dbURL) 48 if err != nil { 49 log.Fatal("Failed to connect to database:", err) 50 } 51 defer func() { 52 if closeErr := db.Close(); closeErr != nil { 53 log.Printf("Failed to close database connection: %v", closeErr) 54 } 55 }() 56 57 if err = db.Ping(); err != nil { 58 log.Fatal("Failed to ping database:", err) 59 } 60 61 log.Println("Connected to AppView database") 62 63 // Run migrations 64 if err = goose.SetDialect("postgres"); err != nil { 65 log.Fatal("Failed to set goose dialect:", err) 66 } 67 68 if err = goose.Up(db, "internal/db/migrations"); err != nil { 69 log.Fatal("Failed to run migrations:", err) 70 } 71 72 log.Println("Migrations completed successfully") 73 74 r := chi.NewRouter() 75 76 r.Use(chiMiddleware.Logger) 77 r.Use(chiMiddleware.Recoverer) 78 r.Use(chiMiddleware.RequestID) 79 80 // Rate limiting: 100 requests per minute per IP 81 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 82 r.Use(rateLimiter.Middleware) 83 84 // Initialize identity resolver 85 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 86 // directory as DID registration to ensure E2E tests work without hitting 87 // the production plc.directory 88 identityConfig := identity.DefaultConfig() 89 90 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 91 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 92 if plcDirectoryURL == "" { 93 plcDirectoryURL = "https://plc.directory" // Default to production PLC 94 } 95 96 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 97 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 98 if isDevEnv { 99 identityConfig.PLCURL = plcDirectoryURL 100 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 101 } else { 102 // Production: Allow separate IDENTITY_PLC_URL for read operations 103 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 104 identityConfig.PLCURL = identityPLCURL 105 } else { 106 identityConfig.PLCURL = plcDirectoryURL 107 } 108 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 109 } 110 111 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 112 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 113 identityConfig.CacheTTL = duration 114 } 115 } 116 117 identityResolver := identity.NewResolver(db, identityConfig) 118 119 // Initialize OAuth session store 120 sessionStore := oauthCore.NewPostgresSessionStore(db) 121 log.Println("OAuth session store initialized") 122 123 // Initialize repositories and services 124 userRepo := postgresRepo.NewUserRepository(db) 125 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 126 127 communityRepo := postgresRepo.NewCommunityRepository(db) 128 129 // V2.0: PDS-managed DID generation 130 // Community DIDs and keys are generated entirely by the PDS 131 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 132 133 instanceDID := os.Getenv("INSTANCE_DID") 134 if instanceDID == "" { 135 instanceDID = "did:web:coves.social" // Default for development 136 } 137 138 // V2: Extract instance domain for community handles 139 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 140 // We cannot allow arbitrary domains to prevent impersonation attacks 141 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 142 // 143 // TODO (Security - V2.1): Implement did:web domain verification 144 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without 145 // actually owning nintendo.com. This allows domain impersonation attacks. 146 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json 147 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web 148 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique). 149 var instanceDomain string 150 if strings.HasPrefix(instanceDID, "did:web:") { 151 // Extract domain from did:web (this is the authoritative source) 152 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 153 } else { 154 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 155 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 156 if instanceDomain == "" { 157 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 158 } 159 } 160 161 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 162 163 // V2.0: Initialize PDS account provisioner for communities (simplified) 164 // PDS handles all DID and key generation - no Coves-side cryptography needed 165 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 166 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 167 log.Printf(" - Communities will be created at: %s", defaultPDS) 168 log.Printf(" - PDS will generate and manage all DIDs and keys") 169 170 // Initialize community service (no longer needs didGenerator directly) 171 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 172 173 // Authenticate Coves instance with PDS to enable community record writes 174 // The instance needs a PDS account to write community records it owns 175 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 176 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 177 if pdsHandle != "" && pdsPassword != "" { 178 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 179 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 180 if authErr != nil { 181 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 182 log.Println("Community creation will fail until PDS authentication is configured") 183 } else { 184 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 185 svc.SetPDSAccessToken(accessToken) 186 log.Println("✓ Coves instance authenticated with PDS") 187 } 188 } 189 } else { 190 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 191 log.Println("Community creation via write-forward is disabled") 192 } 193 194 // Start Jetstream consumer for read-forward user indexing 195 jetstreamURL := os.Getenv("JETSTREAM_URL") 196 if jetstreamURL == "" { 197 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 198 } 199 200 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 201 202 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 203 ctx := context.Background() 204 go func() { 205 if startErr := userConsumer.Start(ctx); startErr != nil { 206 log.Printf("Jetstream consumer stopped: %v", startErr) 207 } 208 }() 209 210 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 211 212 // Note: Community indexing happens through the same Jetstream firehose 213 // The CommunityEventConsumer is used by handlers when processing community-related events 214 // For now, community records are created via write-forward to PDS, then indexed when 215 // they appear in the firehose. A dedicated consumer can be added later if needed. 216 log.Println("Community event consumer initialized (processes events from firehose)") 217 218 // Start OAuth cleanup background job 219 go func() { 220 ticker := time.NewTicker(1 * time.Hour) 221 defer ticker.Stop() 222 for range ticker.C { 223 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok { 224 if cleanupErr := pgStore.CleanupExpiredRequests(ctx); cleanupErr != nil { 225 log.Printf("Failed to cleanup expired OAuth requests: %v", cleanupErr) 226 } 227 if cleanupErr := pgStore.CleanupExpiredSessions(ctx); cleanupErr != nil { 228 log.Printf("Failed to cleanup expired OAuth sessions: %v", cleanupErr) 229 } 230 log.Println("OAuth cleanup completed") 231 } 232 } 233 }() 234 235 log.Println("Started OAuth cleanup background job (runs hourly)") 236 237 // Initialize OAuth cookie store (singleton) 238 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET") 239 if err != nil { 240 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err) 241 } 242 if cookieSecret == "" { 243 log.Fatal("OAUTH_COOKIE_SECRET not configured") 244 } 245 246 if err := oauth.InitCookieStore(cookieSecret); err != nil { 247 log.Fatalf("Failed to initialize cookie store: %v", err) 248 } 249 250 // Initialize OAuth handlers 251 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore) 252 callbackHandler := oauth.NewCallbackHandler(sessionStore) 253 logoutHandler := oauth.NewLogoutHandler(sessionStore) 254 255 // OAuth routes (public endpoints) 256 r.Post("/oauth/login", loginHandler.HandleLogin) 257 r.Get("/oauth/callback", callbackHandler.HandleCallback) 258 r.Post("/oauth/logout", logoutHandler.HandleLogout) 259 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata) 260 r.Get("/oauth/jwks.json", oauth.HandleJWKS) 261 262 log.Println("OAuth endpoints registered") 263 264 // Register XRPC routes 265 routes.RegisterUserRoutes(r, userService) 266 routes.RegisterCommunityRoutes(r, communityService) 267 log.Println("Community XRPC endpoints registered") 268 269 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 270 w.WriteHeader(http.StatusOK) 271 if _, err := w.Write([]byte("OK")); err != nil { 272 log.Printf("Failed to write health check response: %v", err) 273 } 274 }) 275 276 port := os.Getenv("APPVIEW_PORT") 277 if port == "" { 278 port = "8081" // Match .env.dev default 279 } 280 281 fmt.Printf("Coves AppView starting on port %s\n", port) 282 fmt.Printf("Default PDS: %s\n", defaultPDS) 283 log.Fatal(http.ListenAndServe(":"+port, r)) 284} 285 286// authenticateWithPDS creates a session on the PDS and returns an access token 287func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 288 type CreateSessionRequest struct { 289 Identifier string `json:"identifier"` 290 Password string `json:"password"` 291 } 292 293 type CreateSessionResponse struct { 294 DID string `json:"did"` 295 Handle string `json:"handle"` 296 AccessJwt string `json:"accessJwt"` 297 } 298 299 reqBody, err := json.Marshal(CreateSessionRequest{ 300 Identifier: handle, 301 Password: password, 302 }) 303 if err != nil { 304 return "", fmt.Errorf("failed to marshal request: %w", err) 305 } 306 307 resp, err := http.Post( 308 pdsURL+"/xrpc/com.atproto.server.createSession", 309 "application/json", 310 bytes.NewReader(reqBody), 311 ) 312 if err != nil { 313 return "", fmt.Errorf("failed to call PDS: %w", err) 314 } 315 defer func() { 316 if closeErr := resp.Body.Close(); closeErr != nil { 317 log.Printf("Failed to close response body: %v", closeErr) 318 } 319 }() 320 321 if resp.StatusCode != http.StatusOK { 322 body, readErr := io.ReadAll(resp.Body) 323 if readErr != nil { 324 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 325 } 326 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 327 } 328 329 var session CreateSessionResponse 330 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 331 return "", fmt.Errorf("failed to decode response: %w", err) 332 } 333 334 return session.AccessJwt, nil 335}