A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/handlers/oauth" 5 "Coves/internal/api/middleware" 6 "Coves/internal/api/routes" 7 "Coves/internal/atproto/did" 8 "Coves/internal/atproto/identity" 9 "Coves/internal/atproto/jetstream" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/users" 12 "bytes" 13 "context" 14 "database/sql" 15 "encoding/json" 16 "fmt" 17 "io" 18 "log" 19 "net/http" 20 "os" 21 "strings" 22 "time" 23 24 "github.com/go-chi/chi/v5" 25 chiMiddleware "github.com/go-chi/chi/v5/middleware" 26 _ "github.com/lib/pq" 27 "github.com/pressly/goose/v3" 28 29 oauthCore "Coves/internal/core/oauth" 30 31 postgresRepo "Coves/internal/db/postgres" 32) 33 34func main() { 35 // Database configuration (AppView database) 36 dbURL := os.Getenv("DATABASE_URL") 37 if dbURL == "" { 38 // Use dev database from .env.dev 39 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 40 } 41 42 // Default PDS URL for this Coves instance (supports self-hosting) 43 defaultPDS := os.Getenv("PDS_URL") 44 if defaultPDS == "" { 45 defaultPDS = "http://localhost:3001" // Local dev PDS 46 } 47 48 db, err := sql.Open("postgres", dbURL) 49 if err != nil { 50 log.Fatal("Failed to connect to database:", err) 51 } 52 defer func() { 53 if closeErr := db.Close(); closeErr != nil { 54 log.Printf("Failed to close database connection: %v", closeErr) 55 } 56 }() 57 58 if err = db.Ping(); err != nil { 59 log.Fatal("Failed to ping database:", err) 60 } 61 62 log.Println("Connected to AppView database") 63 64 // Run migrations 65 if err = goose.SetDialect("postgres"); err != nil { 66 log.Fatal("Failed to set goose dialect:", err) 67 } 68 69 if err = goose.Up(db, "internal/db/migrations"); err != nil { 70 log.Fatal("Failed to run migrations:", err) 71 } 72 73 log.Println("Migrations completed successfully") 74 75 r := chi.NewRouter() 76 77 r.Use(chiMiddleware.Logger) 78 r.Use(chiMiddleware.Recoverer) 79 r.Use(chiMiddleware.RequestID) 80 81 // Rate limiting: 100 requests per minute per IP 82 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 83 r.Use(rateLimiter.Middleware) 84 85 // Initialize identity resolver 86 identityConfig := identity.DefaultConfig() 87 // Override from environment if set 88 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" { 89 identityConfig.PLCURL = plcURL 90 } 91 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 92 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 93 identityConfig.CacheTTL = duration 94 } 95 } 96 97 identityResolver := identity.NewResolver(db, identityConfig) 98 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL) 99 100 // Initialize OAuth session store 101 sessionStore := oauthCore.NewPostgresSessionStore(db) 102 log.Println("OAuth session store initialized") 103 104 // Initialize repositories and services 105 userRepo := postgresRepo.NewUserRepository(db) 106 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 107 108 communityRepo := postgresRepo.NewCommunityRepository(db) 109 110 // Initialize DID generator for communities 111 // IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory 112 // IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL 113 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 114 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 115 if plcDirectoryURL == "" { 116 plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC 117 } 118 didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL) 119 log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL) 120 121 instanceDID := os.Getenv("INSTANCE_DID") 122 if instanceDID == "" { 123 instanceDID = "did:web:coves.social" // Default for development 124 } 125 126 // V2: Extract instance domain for community handles 127 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 128 // We cannot allow arbitrary domains to prevent impersonation attacks 129 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 130 // 131 // TODO (Security - V2.1): Implement did:web domain verification 132 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without 133 // actually owning nintendo.com. This allows domain impersonation attacks. 134 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json 135 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web 136 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique). 137 var instanceDomain string 138 if strings.HasPrefix(instanceDID, "did:web:") { 139 // Extract domain from did:web (this is the authoritative source) 140 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 141 } else { 142 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 143 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 144 if instanceDomain == "" { 145 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 146 } 147 } 148 149 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 150 151 // V2: Initialize PDS account provisioner for communities 152 provisioner := communities.NewPDSAccountProvisioner(userService, instanceDomain, defaultPDS) 153 154 communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID, instanceDomain, provisioner) 155 156 // Authenticate Coves instance with PDS to enable community record writes 157 // The instance needs a PDS account to write community records it owns 158 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 159 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 160 if pdsHandle != "" && pdsPassword != "" { 161 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 162 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 163 if authErr != nil { 164 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 165 log.Println("Community creation will fail until PDS authentication is configured") 166 } else { 167 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 168 svc.SetPDSAccessToken(accessToken) 169 log.Println("✓ Coves instance authenticated with PDS") 170 } 171 } 172 } else { 173 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 174 log.Println("Community creation via write-forward is disabled") 175 } 176 177 // Start Jetstream consumer for read-forward user indexing 178 jetstreamURL := os.Getenv("JETSTREAM_URL") 179 if jetstreamURL == "" { 180 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 181 } 182 183 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 184 185 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 186 ctx := context.Background() 187 go func() { 188 if startErr := userConsumer.Start(ctx); startErr != nil { 189 log.Printf("Jetstream consumer stopped: %v", startErr) 190 } 191 }() 192 193 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 194 195 // Note: Community indexing happens through the same Jetstream firehose 196 // The CommunityEventConsumer is used by handlers when processing community-related events 197 // For now, community records are created via write-forward to PDS, then indexed when 198 // they appear in the firehose. A dedicated consumer can be added later if needed. 199 log.Println("Community event consumer initialized (processes events from firehose)") 200 201 // Start OAuth cleanup background job 202 go func() { 203 ticker := time.NewTicker(1 * time.Hour) 204 defer ticker.Stop() 205 for range ticker.C { 206 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok { 207 if cleanupErr := pgStore.CleanupExpiredRequests(ctx); cleanupErr != nil { 208 log.Printf("Failed to cleanup expired OAuth requests: %v", cleanupErr) 209 } 210 if cleanupErr := pgStore.CleanupExpiredSessions(ctx); cleanupErr != nil { 211 log.Printf("Failed to cleanup expired OAuth sessions: %v", cleanupErr) 212 } 213 log.Println("OAuth cleanup completed") 214 } 215 } 216 }() 217 218 log.Println("Started OAuth cleanup background job (runs hourly)") 219 220 // Initialize OAuth cookie store (singleton) 221 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET") 222 if err != nil { 223 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err) 224 } 225 if cookieSecret == "" { 226 log.Fatal("OAUTH_COOKIE_SECRET not configured") 227 } 228 229 if err := oauth.InitCookieStore(cookieSecret); err != nil { 230 log.Fatalf("Failed to initialize cookie store: %v", err) 231 } 232 233 // Initialize OAuth handlers 234 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore) 235 callbackHandler := oauth.NewCallbackHandler(sessionStore) 236 logoutHandler := oauth.NewLogoutHandler(sessionStore) 237 238 // OAuth routes (public endpoints) 239 r.Post("/oauth/login", loginHandler.HandleLogin) 240 r.Get("/oauth/callback", callbackHandler.HandleCallback) 241 r.Post("/oauth/logout", logoutHandler.HandleLogout) 242 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata) 243 r.Get("/oauth/jwks.json", oauth.HandleJWKS) 244 245 log.Println("OAuth endpoints registered") 246 247 // Register XRPC routes 248 routes.RegisterUserRoutes(r, userService) 249 routes.RegisterCommunityRoutes(r, communityService) 250 log.Println("Community XRPC endpoints registered") 251 252 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 253 w.WriteHeader(http.StatusOK) 254 if _, err := w.Write([]byte("OK")); err != nil { 255 log.Printf("Failed to write health check response: %v", err) 256 } 257 }) 258 259 port := os.Getenv("APPVIEW_PORT") 260 if port == "" { 261 port = "8081" // Match .env.dev default 262 } 263 264 fmt.Printf("Coves AppView starting on port %s\n", port) 265 fmt.Printf("Default PDS: %s\n", defaultPDS) 266 log.Fatal(http.ListenAndServe(":"+port, r)) 267} 268 269// authenticateWithPDS creates a session on the PDS and returns an access token 270func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 271 type CreateSessionRequest struct { 272 Identifier string `json:"identifier"` 273 Password string `json:"password"` 274 } 275 276 type CreateSessionResponse struct { 277 DID string `json:"did"` 278 Handle string `json:"handle"` 279 AccessJwt string `json:"accessJwt"` 280 } 281 282 reqBody, err := json.Marshal(CreateSessionRequest{ 283 Identifier: handle, 284 Password: password, 285 }) 286 if err != nil { 287 return "", fmt.Errorf("failed to marshal request: %w", err) 288 } 289 290 resp, err := http.Post( 291 pdsURL+"/xrpc/com.atproto.server.createSession", 292 "application/json", 293 bytes.NewReader(reqBody), 294 ) 295 if err != nil { 296 return "", fmt.Errorf("failed to call PDS: %w", err) 297 } 298 defer func() { 299 if closeErr := resp.Body.Close(); closeErr != nil { 300 log.Printf("Failed to close response body: %v", closeErr) 301 } 302 }() 303 304 if resp.StatusCode != http.StatusOK { 305 body, readErr := io.ReadAll(resp.Body) 306 if readErr != nil { 307 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 308 } 309 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 310 } 311 312 var session CreateSessionResponse 313 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 314 return "", fmt.Errorf("failed to decode response: %w", err) 315 } 316 317 return session.AccessJwt, nil 318}