A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/comments" 11 "Coves/internal/core/communities" 12 "Coves/internal/core/communityFeeds" 13 "Coves/internal/core/discover" 14 "Coves/internal/core/posts" 15 "Coves/internal/core/timeline" 16 "Coves/internal/core/users" 17 "bytes" 18 "context" 19 "database/sql" 20 "encoding/json" 21 "fmt" 22 "io" 23 "log" 24 "net/http" 25 "os" 26 "strings" 27 "time" 28 29 "github.com/go-chi/chi/v5" 30 chiMiddleware "github.com/go-chi/chi/v5/middleware" 31 _ "github.com/lib/pq" 32 "github.com/pressly/goose/v3" 33 34 commentsAPI "Coves/internal/api/handlers/comments" 35 36 postgresRepo "Coves/internal/db/postgres" 37) 38 39func main() { 40 // Database configuration (AppView database) 41 dbURL := os.Getenv("DATABASE_URL") 42 if dbURL == "" { 43 // Use dev database from .env.dev 44 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 45 } 46 47 // Default PDS URL for this Coves instance (supports self-hosting) 48 defaultPDS := os.Getenv("PDS_URL") 49 if defaultPDS == "" { 50 defaultPDS = "http://localhost:3001" // Local dev PDS 51 } 52 53 // Cursor secret for HMAC signing (prevents cursor manipulation) 54 cursorSecret := os.Getenv("CURSOR_SECRET") 55 if cursorSecret == "" { 56 // Generate a random secret if not set (dev mode) 57 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 58 cursorSecret = "dev-cursor-secret-change-in-production" 59 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 60 } 61 62 db, err := sql.Open("postgres", dbURL) 63 if err != nil { 64 log.Fatal("Failed to connect to database:", err) 65 } 66 defer func() { 67 if closeErr := db.Close(); closeErr != nil { 68 log.Printf("Failed to close database connection: %v", closeErr) 69 } 70 }() 71 72 if err = db.Ping(); err != nil { 73 log.Fatal("Failed to ping database:", err) 74 } 75 76 log.Println("Connected to AppView database") 77 78 // Run migrations 79 if err = goose.SetDialect("postgres"); err != nil { 80 log.Fatal("Failed to set goose dialect:", err) 81 } 82 83 if err = goose.Up(db, "internal/db/migrations"); err != nil { 84 log.Fatal("Failed to run migrations:", err) 85 } 86 87 log.Println("Migrations completed successfully") 88 89 r := chi.NewRouter() 90 91 r.Use(chiMiddleware.Logger) 92 r.Use(chiMiddleware.Recoverer) 93 r.Use(chiMiddleware.RequestID) 94 95 // Rate limiting: 100 requests per minute per IP 96 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 97 r.Use(rateLimiter.Middleware) 98 99 // Initialize identity resolver 100 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 101 // directory as DID registration to ensure E2E tests work without hitting 102 // the production plc.directory 103 identityConfig := identity.DefaultConfig() 104 105 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 106 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 107 if plcDirectoryURL == "" { 108 plcDirectoryURL = "https://plc.directory" // Default to production PLC 109 } 110 111 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 112 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 113 if isDevEnv { 114 identityConfig.PLCURL = plcDirectoryURL 115 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 116 } else { 117 // Production: Allow separate IDENTITY_PLC_URL for read operations 118 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 119 identityConfig.PLCURL = identityPLCURL 120 } else { 121 identityConfig.PLCURL = plcDirectoryURL 122 } 123 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 124 } 125 126 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 127 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 128 identityConfig.CacheTTL = duration 129 } 130 } 131 132 identityResolver := identity.NewResolver(db, identityConfig) 133 134 // Initialize atProto auth middleware for JWT validation 135 // Phase 1: Set skipVerify=true to test JWT parsing only 136 // Phase 2: Set skipVerify=false to enable full signature verification 137 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 138 if skipVerify { 139 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 140 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 141 } 142 143 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 144 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 145 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 146 log.Println("✅ atProto auth middleware initialized") 147 148 // Initialize repositories and services 149 userRepo := postgresRepo.NewUserRepository(db) 150 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 151 152 communityRepo := postgresRepo.NewCommunityRepository(db) 153 154 // V2.0: PDS-managed DID generation 155 // Community DIDs and keys are generated entirely by the PDS 156 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 157 158 instanceDID := os.Getenv("INSTANCE_DID") 159 if instanceDID == "" { 160 instanceDID = "did:web:coves.social" // Default for development 161 } 162 163 // V2: Extract instance domain for community handles 164 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 165 // We cannot allow arbitrary domains to prevent impersonation attacks 166 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 167 // 168 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 169 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 170 // Communities with mismatched hostedBy domains are rejected during indexing 171 var instanceDomain string 172 if strings.HasPrefix(instanceDID, "did:web:") { 173 // Extract domain from did:web (this is the authoritative source) 174 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 175 } else { 176 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 177 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 178 if instanceDomain == "" { 179 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 180 } 181 } 182 183 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 184 185 // V2.0: Initialize PDS account provisioner for communities (simplified) 186 // PDS handles all DID and key generation - no Coves-side cryptography needed 187 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 188 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 189 log.Printf(" - Communities will be created at: %s", defaultPDS) 190 log.Printf(" - PDS will generate and manage all DIDs and keys") 191 192 // Initialize community service (no longer needs didGenerator directly) 193 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 194 195 // Authenticate Coves instance with PDS to enable community record writes 196 // The instance needs a PDS account to write community records it owns 197 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 198 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 199 if pdsHandle != "" && pdsPassword != "" { 200 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 201 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 202 if authErr != nil { 203 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 204 log.Println("Community creation will fail until PDS authentication is configured") 205 } else { 206 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 207 svc.SetPDSAccessToken(accessToken) 208 log.Println("✓ Coves instance authenticated with PDS") 209 } 210 } 211 } else { 212 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 213 log.Println("Community creation via write-forward is disabled") 214 } 215 216 // Start Jetstream consumer for read-forward user indexing 217 jetstreamURL := os.Getenv("JETSTREAM_URL") 218 if jetstreamURL == "" { 219 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 220 } 221 222 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 223 224 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 225 ctx := context.Background() 226 go func() { 227 if startErr := userConsumer.Start(ctx); startErr != nil { 228 log.Printf("Jetstream consumer stopped: %v", startErr) 229 } 230 }() 231 232 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 233 234 // Start Jetstream consumer for community events (profiles and subscriptions) 235 // This consumer indexes: 236 // 1. Community profiles (social.coves.community.profile) - in community's own repo 237 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 238 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 239 if communityJetstreamURL == "" { 240 // Local Jetstream for communities - filter to our instance's collections 241 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 242 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 243 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 244 } 245 246 // Initialize community event consumer with did:web verification 247 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 248 if skipDIDWebVerification { 249 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 250 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 251 } 252 253 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 254 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 255 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 256 257 go func() { 258 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 259 log.Printf("Community Jetstream consumer stopped: %v", startErr) 260 } 261 }() 262 263 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 264 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 265 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 266 267 // Start JWKS cache cleanup background job 268 go func() { 269 ticker := time.NewTicker(1 * time.Hour) 270 defer ticker.Stop() 271 for range ticker.C { 272 jwksFetcher.CleanupExpiredCache() 273 log.Println("JWKS cache cleanup completed") 274 } 275 }() 276 277 log.Println("Started JWKS cache cleanup background job (runs hourly)") 278 279 // Initialize aggregator service 280 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 281 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 282 log.Println("✅ Aggregator service initialized") 283 284 // Initialize post service (with aggregator support) 285 postRepo := postgresRepo.NewPostRepository(db) 286 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS) 287 288 // Initialize vote repository (used by Jetstream consumer for indexing) 289 voteRepo := postgresRepo.NewVoteRepository(db) 290 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 291 292 // Initialize comment repository (used by Jetstream consumer for indexing) 293 commentRepo := postgresRepo.NewCommentRepository(db) 294 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 295 296 // Initialize comment service (for query API) 297 // Requires user and community repos for proper author/community hydration per lexicon 298 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 299 log.Println("✅ Comment service initialized (with author/community hydration)") 300 301 // Initialize feed service 302 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 303 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 304 log.Println("✅ Feed service initialized") 305 306 // Initialize timeline service (home feed from subscribed communities) 307 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 308 timelineService := timeline.NewTimelineService(timelineRepo) 309 log.Println("✅ Timeline service initialized") 310 311 // Initialize discover service (public feed from all communities) 312 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 313 discoverService := discover.NewDiscoverService(discoverRepo) 314 log.Println("✅ Discover service initialized") 315 316 // Start Jetstream consumer for posts 317 // This consumer indexes posts created in community repositories via the firehose 318 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 319 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 320 if postJetstreamURL == "" { 321 // Listen to post record creation events 322 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 323 } 324 325 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 326 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 327 328 go func() { 329 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 330 log.Printf("Post Jetstream consumer stopped: %v", startErr) 331 } 332 }() 333 334 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 335 log.Println(" - Indexing: social.coves.community.post CREATE operations") 336 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 337 338 // Start Jetstream consumer for aggregators 339 // This consumer indexes aggregator service declarations and authorization records 340 // Following Bluesky's pattern for feed generators and labelers 341 // NOTE: Uses the same Jetstream as communities, just filtering different collections 342 aggregatorJetstreamURL := communityJetstreamURL 343 // Override if specific URL needed for testing 344 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 345 aggregatorJetstreamURL = envURL 346 } else if aggregatorJetstreamURL == "" { 347 // Fallback if community URL also not set 348 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 349 } 350 351 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 352 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 353 354 go func() { 355 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 356 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 357 } 358 }() 359 360 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 361 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 362 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 363 364 // Start Jetstream consumer for votes 365 // This consumer indexes votes from user repositories and updates post vote counts 366 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 367 if voteJetstreamURL == "" { 368 // Listen to vote record CREATE/DELETE events from user repositories 369 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 370 } 371 372 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 373 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 374 375 go func() { 376 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 377 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 378 } 379 }() 380 381 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 382 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 383 log.Println(" - Updating: Post vote counts atomically") 384 385 // Start Jetstream consumer for comments 386 // This consumer indexes comments from user repositories and updates parent counts 387 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 388 if commentJetstreamURL == "" { 389 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 390 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment" 391 } 392 393 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 394 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 395 396 go func() { 397 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 398 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 399 } 400 }() 401 402 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 403 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations") 404 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 405 406 // Register XRPC routes 407 routes.RegisterUserRoutes(r, userService) 408 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 409 log.Println("Community XRPC endpoints registered with OAuth authentication") 410 411 routes.RegisterPostRoutes(r, postService, authMiddleware) 412 log.Println("Post XRPC endpoints registered with OAuth authentication") 413 414 // Vote write endpoints removed - clients write directly to their PDS 415 // The AppView indexes votes from Jetstream (see vote consumer above) 416 417 routes.RegisterCommunityFeedRoutes(r, feedService) 418 log.Println("Feed XRPC endpoints registered (public, no auth required)") 419 420 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 421 log.Println("Timeline XRPC endpoints registered (requires authentication)") 422 423 routes.RegisterDiscoverRoutes(r, discoverService) 424 log.Println("Discover XRPC endpoints registered (public, no auth required)") 425 426 routes.RegisterAggregatorRoutes(r, aggregatorService) 427 log.Println("Aggregator XRPC endpoints registered (query endpoints public)") 428 429 // Comment query API - supports optional authentication for viewer state 430 // Stricter rate limiting for expensive nested comment queries 431 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 432 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 433 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 434 r.Handle( 435 "/xrpc/social.coves.community.comment.getComments", 436 commentRateLimiter.Middleware( 437 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 438 ), 439 ) 440 log.Println("✅ Comment query API registered (20 req/min rate limit)") 441 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 442 443 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 444 w.WriteHeader(http.StatusOK) 445 if _, err := w.Write([]byte("OK")); err != nil { 446 log.Printf("Failed to write health check response: %v", err) 447 } 448 }) 449 450 port := os.Getenv("APPVIEW_PORT") 451 if port == "" { 452 port = "8081" // Match .env.dev default 453 } 454 455 fmt.Printf("Coves AppView starting on port %s\n", port) 456 fmt.Printf("Default PDS: %s\n", defaultPDS) 457 log.Fatal(http.ListenAndServe(":"+port, r)) 458} 459 460// authenticateWithPDS creates a session on the PDS and returns an access token 461func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 462 type CreateSessionRequest struct { 463 Identifier string `json:"identifier"` 464 Password string `json:"password"` 465 } 466 467 type CreateSessionResponse struct { 468 DID string `json:"did"` 469 Handle string `json:"handle"` 470 AccessJwt string `json:"accessJwt"` 471 } 472 473 reqBody, err := json.Marshal(CreateSessionRequest{ 474 Identifier: handle, 475 Password: password, 476 }) 477 if err != nil { 478 return "", fmt.Errorf("failed to marshal request: %w", err) 479 } 480 481 resp, err := http.Post( 482 pdsURL+"/xrpc/com.atproto.server.createSession", 483 "application/json", 484 bytes.NewReader(reqBody), 485 ) 486 if err != nil { 487 return "", fmt.Errorf("failed to call PDS: %w", err) 488 } 489 defer func() { 490 if closeErr := resp.Body.Close(); closeErr != nil { 491 log.Printf("Failed to close response body: %v", closeErr) 492 } 493 }() 494 495 if resp.StatusCode != http.StatusOK { 496 body, readErr := io.ReadAll(resp.Body) 497 if readErr != nil { 498 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 499 } 500 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 501 } 502 503 var session CreateSessionResponse 504 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 505 return "", fmt.Errorf("failed to decode response: %w", err) 506 } 507 508 return session.AccessJwt, nil 509}