A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/communities" 10 "Coves/internal/core/users" 11 "bytes" 12 "context" 13 "database/sql" 14 "encoding/json" 15 "fmt" 16 "io" 17 "log" 18 "net/http" 19 "os" 20 "strings" 21 "time" 22 23 "github.com/go-chi/chi/v5" 24 chiMiddleware "github.com/go-chi/chi/v5/middleware" 25 _ "github.com/lib/pq" 26 "github.com/pressly/goose/v3" 27 28 postgresRepo "Coves/internal/db/postgres" 29) 30 31func main() { 32 // Database configuration (AppView database) 33 dbURL := os.Getenv("DATABASE_URL") 34 if dbURL == "" { 35 // Use dev database from .env.dev 36 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 37 } 38 39 // Default PDS URL for this Coves instance (supports self-hosting) 40 defaultPDS := os.Getenv("PDS_URL") 41 if defaultPDS == "" { 42 defaultPDS = "http://localhost:3001" // Local dev PDS 43 } 44 45 db, err := sql.Open("postgres", dbURL) 46 if err != nil { 47 log.Fatal("Failed to connect to database:", err) 48 } 49 defer func() { 50 if closeErr := db.Close(); closeErr != nil { 51 log.Printf("Failed to close database connection: %v", closeErr) 52 } 53 }() 54 55 if err = db.Ping(); err != nil { 56 log.Fatal("Failed to ping database:", err) 57 } 58 59 log.Println("Connected to AppView database") 60 61 // Run migrations 62 if err = goose.SetDialect("postgres"); err != nil { 63 log.Fatal("Failed to set goose dialect:", err) 64 } 65 66 if err = goose.Up(db, "internal/db/migrations"); err != nil { 67 log.Fatal("Failed to run migrations:", err) 68 } 69 70 log.Println("Migrations completed successfully") 71 72 r := chi.NewRouter() 73 74 r.Use(chiMiddleware.Logger) 75 r.Use(chiMiddleware.Recoverer) 76 r.Use(chiMiddleware.RequestID) 77 78 // Rate limiting: 100 requests per minute per IP 79 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 80 r.Use(rateLimiter.Middleware) 81 82 // Initialize identity resolver 83 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 84 // directory as DID registration to ensure E2E tests work without hitting 85 // the production plc.directory 86 identityConfig := identity.DefaultConfig() 87 88 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 89 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 90 if plcDirectoryURL == "" { 91 plcDirectoryURL = "https://plc.directory" // Default to production PLC 92 } 93 94 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 95 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 96 if isDevEnv { 97 identityConfig.PLCURL = plcDirectoryURL 98 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 99 } else { 100 // Production: Allow separate IDENTITY_PLC_URL for read operations 101 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 102 identityConfig.PLCURL = identityPLCURL 103 } else { 104 identityConfig.PLCURL = plcDirectoryURL 105 } 106 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 107 } 108 109 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 110 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 111 identityConfig.CacheTTL = duration 112 } 113 } 114 115 identityResolver := identity.NewResolver(db, identityConfig) 116 117 // Initialize atProto auth middleware for JWT validation 118 // Phase 1: Set skipVerify=true to test JWT parsing only 119 // Phase 2: Set skipVerify=false to enable full signature verification 120 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 121 if skipVerify { 122 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 123 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 124 } 125 126 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 127 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 128 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 129 log.Println("✅ atProto auth middleware initialized") 130 131 // Initialize repositories and services 132 userRepo := postgresRepo.NewUserRepository(db) 133 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 134 135 communityRepo := postgresRepo.NewCommunityRepository(db) 136 137 // V2.0: PDS-managed DID generation 138 // Community DIDs and keys are generated entirely by the PDS 139 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 140 141 instanceDID := os.Getenv("INSTANCE_DID") 142 if instanceDID == "" { 143 instanceDID = "did:web:coves.social" // Default for development 144 } 145 146 // V2: Extract instance domain for community handles 147 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 148 // We cannot allow arbitrary domains to prevent impersonation attacks 149 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 150 // 151 // TODO (Security - V2.1): Implement did:web domain verification 152 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without 153 // actually owning nintendo.com. This allows domain impersonation attacks. 154 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json 155 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web 156 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique). 157 var instanceDomain string 158 if strings.HasPrefix(instanceDID, "did:web:") { 159 // Extract domain from did:web (this is the authoritative source) 160 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 161 } else { 162 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 163 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 164 if instanceDomain == "" { 165 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 166 } 167 } 168 169 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 170 171 // V2.0: Initialize PDS account provisioner for communities (simplified) 172 // PDS handles all DID and key generation - no Coves-side cryptography needed 173 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 174 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 175 log.Printf(" - Communities will be created at: %s", defaultPDS) 176 log.Printf(" - PDS will generate and manage all DIDs and keys") 177 178 // Initialize community service (no longer needs didGenerator directly) 179 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 180 181 // Authenticate Coves instance with PDS to enable community record writes 182 // The instance needs a PDS account to write community records it owns 183 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 184 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 185 if pdsHandle != "" && pdsPassword != "" { 186 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 187 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 188 if authErr != nil { 189 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 190 log.Println("Community creation will fail until PDS authentication is configured") 191 } else { 192 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 193 svc.SetPDSAccessToken(accessToken) 194 log.Println("✓ Coves instance authenticated with PDS") 195 } 196 } 197 } else { 198 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 199 log.Println("Community creation via write-forward is disabled") 200 } 201 202 // Start Jetstream consumer for read-forward user indexing 203 jetstreamURL := os.Getenv("JETSTREAM_URL") 204 if jetstreamURL == "" { 205 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 206 } 207 208 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 209 210 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 211 ctx := context.Background() 212 go func() { 213 if startErr := userConsumer.Start(ctx); startErr != nil { 214 log.Printf("Jetstream consumer stopped: %v", startErr) 215 } 216 }() 217 218 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 219 220 // Start Jetstream consumer for community events (profiles and subscriptions) 221 // This consumer indexes: 222 // 1. Community profiles (social.coves.community.profile) - in community's own repo 223 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 224 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 225 if communityJetstreamURL == "" { 226 // Local Jetstream for communities - filter to our instance's collections 227 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 228 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 229 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 230 } 231 232 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo) 233 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 234 235 go func() { 236 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 237 log.Printf("Community Jetstream consumer stopped: %v", startErr) 238 } 239 }() 240 241 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 242 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 243 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 244 245 // Start JWKS cache cleanup background job 246 go func() { 247 ticker := time.NewTicker(1 * time.Hour) 248 defer ticker.Stop() 249 for range ticker.C { 250 jwksFetcher.CleanupExpiredCache() 251 log.Println("JWKS cache cleanup completed") 252 } 253 }() 254 255 log.Println("Started JWKS cache cleanup background job (runs hourly)") 256 257 // Register XRPC routes 258 routes.RegisterUserRoutes(r, userService) 259 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 260 log.Println("Community XRPC endpoints registered with OAuth authentication") 261 262 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 263 w.WriteHeader(http.StatusOK) 264 if _, err := w.Write([]byte("OK")); err != nil { 265 log.Printf("Failed to write health check response: %v", err) 266 } 267 }) 268 269 port := os.Getenv("APPVIEW_PORT") 270 if port == "" { 271 port = "8081" // Match .env.dev default 272 } 273 274 fmt.Printf("Coves AppView starting on port %s\n", port) 275 fmt.Printf("Default PDS: %s\n", defaultPDS) 276 log.Fatal(http.ListenAndServe(":"+port, r)) 277} 278 279// authenticateWithPDS creates a session on the PDS and returns an access token 280func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 281 type CreateSessionRequest struct { 282 Identifier string `json:"identifier"` 283 Password string `json:"password"` 284 } 285 286 type CreateSessionResponse struct { 287 DID string `json:"did"` 288 Handle string `json:"handle"` 289 AccessJwt string `json:"accessJwt"` 290 } 291 292 reqBody, err := json.Marshal(CreateSessionRequest{ 293 Identifier: handle, 294 Password: password, 295 }) 296 if err != nil { 297 return "", fmt.Errorf("failed to marshal request: %w", err) 298 } 299 300 resp, err := http.Post( 301 pdsURL+"/xrpc/com.atproto.server.createSession", 302 "application/json", 303 bytes.NewReader(reqBody), 304 ) 305 if err != nil { 306 return "", fmt.Errorf("failed to call PDS: %w", err) 307 } 308 defer func() { 309 if closeErr := resp.Body.Close(); closeErr != nil { 310 log.Printf("Failed to close response body: %v", closeErr) 311 } 312 }() 313 314 if resp.StatusCode != http.StatusOK { 315 body, readErr := io.ReadAll(resp.Body) 316 if readErr != nil { 317 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 318 } 319 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 320 } 321 322 var session CreateSessionResponse 323 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 324 return "", fmt.Errorf("failed to decode response: %w", err) 325 } 326 327 return session.AccessJwt, nil 328}