A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/communities" 10 "Coves/internal/core/posts" 11 "Coves/internal/core/users" 12 "bytes" 13 "context" 14 "database/sql" 15 "encoding/json" 16 "fmt" 17 "io" 18 "log" 19 "net/http" 20 "os" 21 "strings" 22 "time" 23 24 "github.com/go-chi/chi/v5" 25 chiMiddleware "github.com/go-chi/chi/v5/middleware" 26 _ "github.com/lib/pq" 27 "github.com/pressly/goose/v3" 28 29 postgresRepo "Coves/internal/db/postgres" 30) 31 32func main() { 33 // Database configuration (AppView database) 34 dbURL := os.Getenv("DATABASE_URL") 35 if dbURL == "" { 36 // Use dev database from .env.dev 37 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 38 } 39 40 // Default PDS URL for this Coves instance (supports self-hosting) 41 defaultPDS := os.Getenv("PDS_URL") 42 if defaultPDS == "" { 43 defaultPDS = "http://localhost:3001" // Local dev PDS 44 } 45 46 db, err := sql.Open("postgres", dbURL) 47 if err != nil { 48 log.Fatal("Failed to connect to database:", err) 49 } 50 defer func() { 51 if closeErr := db.Close(); closeErr != nil { 52 log.Printf("Failed to close database connection: %v", closeErr) 53 } 54 }() 55 56 if err = db.Ping(); err != nil { 57 log.Fatal("Failed to ping database:", err) 58 } 59 60 log.Println("Connected to AppView database") 61 62 // Run migrations 63 if err = goose.SetDialect("postgres"); err != nil { 64 log.Fatal("Failed to set goose dialect:", err) 65 } 66 67 if err = goose.Up(db, "internal/db/migrations"); err != nil { 68 log.Fatal("Failed to run migrations:", err) 69 } 70 71 log.Println("Migrations completed successfully") 72 73 r := chi.NewRouter() 74 75 r.Use(chiMiddleware.Logger) 76 r.Use(chiMiddleware.Recoverer) 77 r.Use(chiMiddleware.RequestID) 78 79 // Rate limiting: 100 requests per minute per IP 80 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 81 r.Use(rateLimiter.Middleware) 82 83 // Initialize identity resolver 84 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 85 // directory as DID registration to ensure E2E tests work without hitting 86 // the production plc.directory 87 identityConfig := identity.DefaultConfig() 88 89 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 90 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 91 if plcDirectoryURL == "" { 92 plcDirectoryURL = "https://plc.directory" // Default to production PLC 93 } 94 95 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 96 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 97 if isDevEnv { 98 identityConfig.PLCURL = plcDirectoryURL 99 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 100 } else { 101 // Production: Allow separate IDENTITY_PLC_URL for read operations 102 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 103 identityConfig.PLCURL = identityPLCURL 104 } else { 105 identityConfig.PLCURL = plcDirectoryURL 106 } 107 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 108 } 109 110 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 111 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 112 identityConfig.CacheTTL = duration 113 } 114 } 115 116 identityResolver := identity.NewResolver(db, identityConfig) 117 118 // Initialize atProto auth middleware for JWT validation 119 // Phase 1: Set skipVerify=true to test JWT parsing only 120 // Phase 2: Set skipVerify=false to enable full signature verification 121 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 122 if skipVerify { 123 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 124 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 125 } 126 127 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 128 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 129 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 130 log.Println("✅ atProto auth middleware initialized") 131 132 // Initialize repositories and services 133 userRepo := postgresRepo.NewUserRepository(db) 134 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 135 136 communityRepo := postgresRepo.NewCommunityRepository(db) 137 138 // V2.0: PDS-managed DID generation 139 // Community DIDs and keys are generated entirely by the PDS 140 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 141 142 instanceDID := os.Getenv("INSTANCE_DID") 143 if instanceDID == "" { 144 instanceDID = "did:web:coves.social" // Default for development 145 } 146 147 // V2: Extract instance domain for community handles 148 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 149 // We cannot allow arbitrary domains to prevent impersonation attacks 150 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 151 // 152 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 153 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 154 // Communities with mismatched hostedBy domains are rejected during indexing 155 var instanceDomain string 156 if strings.HasPrefix(instanceDID, "did:web:") { 157 // Extract domain from did:web (this is the authoritative source) 158 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 159 } else { 160 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 161 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 162 if instanceDomain == "" { 163 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 164 } 165 } 166 167 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 168 169 // V2.0: Initialize PDS account provisioner for communities (simplified) 170 // PDS handles all DID and key generation - no Coves-side cryptography needed 171 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 172 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 173 log.Printf(" - Communities will be created at: %s", defaultPDS) 174 log.Printf(" - PDS will generate and manage all DIDs and keys") 175 176 // Initialize community service (no longer needs didGenerator directly) 177 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 178 179 // Authenticate Coves instance with PDS to enable community record writes 180 // The instance needs a PDS account to write community records it owns 181 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 182 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 183 if pdsHandle != "" && pdsPassword != "" { 184 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 185 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 186 if authErr != nil { 187 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 188 log.Println("Community creation will fail until PDS authentication is configured") 189 } else { 190 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 191 svc.SetPDSAccessToken(accessToken) 192 log.Println("✓ Coves instance authenticated with PDS") 193 } 194 } 195 } else { 196 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 197 log.Println("Community creation via write-forward is disabled") 198 } 199 200 // Start Jetstream consumer for read-forward user indexing 201 jetstreamURL := os.Getenv("JETSTREAM_URL") 202 if jetstreamURL == "" { 203 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 204 } 205 206 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 207 208 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 209 ctx := context.Background() 210 go func() { 211 if startErr := userConsumer.Start(ctx); startErr != nil { 212 log.Printf("Jetstream consumer stopped: %v", startErr) 213 } 214 }() 215 216 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 217 218 // Start Jetstream consumer for community events (profiles and subscriptions) 219 // This consumer indexes: 220 // 1. Community profiles (social.coves.community.profile) - in community's own repo 221 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 222 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 223 if communityJetstreamURL == "" { 224 // Local Jetstream for communities - filter to our instance's collections 225 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 226 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 227 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 228 } 229 230 // Initialize community event consumer with did:web verification 231 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 232 if skipDIDWebVerification { 233 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 234 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 235 } 236 237 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification) 238 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 239 240 go func() { 241 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 242 log.Printf("Community Jetstream consumer stopped: %v", startErr) 243 } 244 }() 245 246 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 247 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 248 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 249 250 // Start JWKS cache cleanup background job 251 go func() { 252 ticker := time.NewTicker(1 * time.Hour) 253 defer ticker.Stop() 254 for range ticker.C { 255 jwksFetcher.CleanupExpiredCache() 256 log.Println("JWKS cache cleanup completed") 257 } 258 }() 259 260 log.Println("Started JWKS cache cleanup background job (runs hourly)") 261 262 // Initialize post service 263 postRepo := postgresRepo.NewPostRepository(db) 264 postService := posts.NewPostService(postRepo, communityService, defaultPDS) 265 266 // Start Jetstream consumer for posts 267 // This consumer indexes posts created in community repositories via the firehose 268 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 269 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 270 if postJetstreamURL == "" { 271 // Listen to post record creation events 272 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record" 273 } 274 275 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 276 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 277 278 go func() { 279 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 280 log.Printf("Post Jetstream consumer stopped: %v", startErr) 281 } 282 }() 283 284 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 285 log.Println(" - Indexing: social.coves.post.record CREATE operations") 286 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 287 288 // Register XRPC routes 289 routes.RegisterUserRoutes(r, userService) 290 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 291 log.Println("Community XRPC endpoints registered with OAuth authentication") 292 293 routes.RegisterPostRoutes(r, postService, authMiddleware) 294 log.Println("Post XRPC endpoints registered with OAuth authentication") 295 296 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 297 w.WriteHeader(http.StatusOK) 298 if _, err := w.Write([]byte("OK")); err != nil { 299 log.Printf("Failed to write health check response: %v", err) 300 } 301 }) 302 303 port := os.Getenv("APPVIEW_PORT") 304 if port == "" { 305 port = "8081" // Match .env.dev default 306 } 307 308 fmt.Printf("Coves AppView starting on port %s\n", port) 309 fmt.Printf("Default PDS: %s\n", defaultPDS) 310 log.Fatal(http.ListenAndServe(":"+port, r)) 311} 312 313// authenticateWithPDS creates a session on the PDS and returns an access token 314func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 315 type CreateSessionRequest struct { 316 Identifier string `json:"identifier"` 317 Password string `json:"password"` 318 } 319 320 type CreateSessionResponse struct { 321 DID string `json:"did"` 322 Handle string `json:"handle"` 323 AccessJwt string `json:"accessJwt"` 324 } 325 326 reqBody, err := json.Marshal(CreateSessionRequest{ 327 Identifier: handle, 328 Password: password, 329 }) 330 if err != nil { 331 return "", fmt.Errorf("failed to marshal request: %w", err) 332 } 333 334 resp, err := http.Post( 335 pdsURL+"/xrpc/com.atproto.server.createSession", 336 "application/json", 337 bytes.NewReader(reqBody), 338 ) 339 if err != nil { 340 return "", fmt.Errorf("failed to call PDS: %w", err) 341 } 342 defer func() { 343 if closeErr := resp.Body.Close(); closeErr != nil { 344 log.Printf("Failed to close response body: %v", closeErr) 345 } 346 }() 347 348 if resp.StatusCode != http.StatusOK { 349 body, readErr := io.ReadAll(resp.Body) 350 if readErr != nil { 351 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 352 } 353 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 354 } 355 356 var session CreateSessionResponse 357 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 358 return "", fmt.Errorf("failed to decode response: %w", err) 359 } 360 361 return session.AccessJwt, nil 362}