A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log" 11 "net/http" 12 "os" 13 "strings" 14 "time" 15 16 "Coves/internal/api/middleware" 17 "Coves/internal/api/routes" 18 "Coves/internal/atproto/auth" 19 "Coves/internal/atproto/identity" 20 "Coves/internal/atproto/jetstream" 21 "Coves/internal/core/aggregators" 22 "Coves/internal/core/comments" 23 "Coves/internal/core/communities" 24 "Coves/internal/core/communityFeeds" 25 "Coves/internal/core/discover" 26 "Coves/internal/core/posts" 27 "Coves/internal/core/timeline" 28 "Coves/internal/core/users" 29 30 "github.com/go-chi/chi/v5" 31 chiMiddleware "github.com/go-chi/chi/v5/middleware" 32 _ "github.com/lib/pq" 33 "github.com/pressly/goose/v3" 34 35 commentsAPI "Coves/internal/api/handlers/comments" 36 37 postgresRepo "Coves/internal/db/postgres" 38) 39 40func main() { 41 // Database configuration (AppView database) 42 dbURL := os.Getenv("DATABASE_URL") 43 if dbURL == "" { 44 // Use dev database from .env.dev 45 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 46 } 47 48 // Default PDS URL for this Coves instance (supports self-hosting) 49 defaultPDS := os.Getenv("PDS_URL") 50 if defaultPDS == "" { 51 defaultPDS = "http://localhost:3001" // Local dev PDS 52 } 53 54 // Cursor secret for HMAC signing (prevents cursor manipulation) 55 cursorSecret := os.Getenv("CURSOR_SECRET") 56 if cursorSecret == "" { 57 // Generate a random secret if not set (dev mode) 58 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 59 cursorSecret = "dev-cursor-secret-change-in-production" 60 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 61 } 62 63 db, err := sql.Open("postgres", dbURL) 64 if err != nil { 65 log.Fatal("Failed to connect to database:", err) 66 } 67 defer func() { 68 if closeErr := db.Close(); closeErr != nil { 69 log.Printf("Failed to close database connection: %v", closeErr) 70 } 71 }() 72 73 if err = db.Ping(); err != nil { 74 log.Fatal("Failed to ping database:", err) 75 } 76 77 log.Println("Connected to AppView database") 78 79 // Run migrations 80 if err = goose.SetDialect("postgres"); err != nil { 81 log.Fatal("Failed to set goose dialect:", err) 82 } 83 84 if err = goose.Up(db, "internal/db/migrations"); err != nil { 85 log.Fatal("Failed to run migrations:", err) 86 } 87 88 log.Println("Migrations completed successfully") 89 90 r := chi.NewRouter() 91 92 r.Use(chiMiddleware.Logger) 93 r.Use(chiMiddleware.Recoverer) 94 r.Use(chiMiddleware.RequestID) 95 96 // Rate limiting: 100 requests per minute per IP 97 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 98 r.Use(rateLimiter.Middleware) 99 100 // Initialize identity resolver 101 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 102 // directory as DID registration to ensure E2E tests work without hitting 103 // the production plc.directory 104 identityConfig := identity.DefaultConfig() 105 106 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 107 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 108 if plcDirectoryURL == "" { 109 plcDirectoryURL = "https://plc.directory" // Default to production PLC 110 } 111 112 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 113 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 114 if isDevEnv { 115 identityConfig.PLCURL = plcDirectoryURL 116 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 117 } else { 118 // Production: Allow separate IDENTITY_PLC_URL for read operations 119 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 120 identityConfig.PLCURL = identityPLCURL 121 } else { 122 identityConfig.PLCURL = plcDirectoryURL 123 } 124 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 125 } 126 127 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 128 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 129 identityConfig.CacheTTL = duration 130 } 131 } 132 133 identityResolver := identity.NewResolver(db, identityConfig) 134 135 // Initialize atProto auth middleware for JWT validation 136 // Phase 1: Set skipVerify=true to test JWT parsing only 137 // Phase 2: Set skipVerify=false to enable full signature verification 138 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 139 if skipVerify { 140 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 141 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 142 } 143 144 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 145 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 146 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 147 log.Println("✅ atProto auth middleware initialized") 148 149 // Initialize repositories and services 150 userRepo := postgresRepo.NewUserRepository(db) 151 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 152 153 communityRepo := postgresRepo.NewCommunityRepository(db) 154 155 // V2.0: PDS-managed DID generation 156 // Community DIDs and keys are generated entirely by the PDS 157 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 158 159 instanceDID := os.Getenv("INSTANCE_DID") 160 if instanceDID == "" { 161 instanceDID = "did:web:coves.social" // Default for development 162 } 163 164 // V2: Extract instance domain for community handles 165 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 166 // We cannot allow arbitrary domains to prevent impersonation attacks 167 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 168 // 169 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 170 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 171 // Communities with mismatched hostedBy domains are rejected during indexing 172 var instanceDomain string 173 if strings.HasPrefix(instanceDID, "did:web:") { 174 // Extract domain from did:web (this is the authoritative source) 175 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 176 } else { 177 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 178 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 179 if instanceDomain == "" { 180 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 181 } 182 } 183 184 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 185 186 // V2.0: Initialize PDS account provisioner for communities (simplified) 187 // PDS handles all DID and key generation - no Coves-side cryptography needed 188 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 189 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 190 log.Printf(" - Communities will be created at: %s", defaultPDS) 191 log.Printf(" - PDS will generate and manage all DIDs and keys") 192 193 // Initialize community service (no longer needs didGenerator directly) 194 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 195 196 // Authenticate Coves instance with PDS to enable community record writes 197 // The instance needs a PDS account to write community records it owns 198 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 199 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 200 if pdsHandle != "" && pdsPassword != "" { 201 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 202 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 203 if authErr != nil { 204 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 205 log.Println("Community creation will fail until PDS authentication is configured") 206 } else { 207 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 208 svc.SetPDSAccessToken(accessToken) 209 log.Println("✓ Coves instance authenticated with PDS") 210 } 211 } 212 } else { 213 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 214 log.Println("Community creation via write-forward is disabled") 215 } 216 217 // Start Jetstream consumer for read-forward user indexing 218 jetstreamURL := os.Getenv("JETSTREAM_URL") 219 if jetstreamURL == "" { 220 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 221 } 222 223 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 224 225 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 226 ctx := context.Background() 227 go func() { 228 if startErr := userConsumer.Start(ctx); startErr != nil { 229 log.Printf("Jetstream consumer stopped: %v", startErr) 230 } 231 }() 232 233 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 234 235 // Start Jetstream consumer for community events (profiles and subscriptions) 236 // This consumer indexes: 237 // 1. Community profiles (social.coves.community.profile) - in community's own repo 238 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 239 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 240 if communityJetstreamURL == "" { 241 // Local Jetstream for communities - filter to our instance's collections 242 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 243 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 244 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 245 } 246 247 // Initialize community event consumer with did:web verification 248 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 249 if skipDIDWebVerification { 250 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 251 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 252 } 253 254 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 255 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 256 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 257 258 go func() { 259 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 260 log.Printf("Community Jetstream consumer stopped: %v", startErr) 261 } 262 }() 263 264 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 265 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 266 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 267 268 // Start JWKS cache cleanup background job 269 go func() { 270 ticker := time.NewTicker(1 * time.Hour) 271 defer ticker.Stop() 272 for range ticker.C { 273 jwksFetcher.CleanupExpiredCache() 274 log.Println("JWKS cache cleanup completed") 275 } 276 }() 277 278 log.Println("Started JWKS cache cleanup background job (runs hourly)") 279 280 // Initialize aggregator service 281 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 282 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 283 log.Println("✅ Aggregator service initialized") 284 285 // Initialize post service (with aggregator support) 286 postRepo := postgresRepo.NewPostRepository(db) 287 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS) 288 289 // Initialize vote repository (used by Jetstream consumer for indexing) 290 voteRepo := postgresRepo.NewVoteRepository(db) 291 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 292 293 // Initialize comment repository (used by Jetstream consumer for indexing) 294 commentRepo := postgresRepo.NewCommentRepository(db) 295 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 296 297 // Initialize comment service (for query API) 298 // Requires user and community repos for proper author/community hydration per lexicon 299 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo) 300 log.Println("✅ Comment service initialized (with author/community hydration)") 301 302 // Initialize feed service 303 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 304 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 305 log.Println("✅ Feed service initialized") 306 307 // Initialize timeline service (home feed from subscribed communities) 308 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 309 timelineService := timeline.NewTimelineService(timelineRepo) 310 log.Println("✅ Timeline service initialized") 311 312 // Initialize discover service (public feed from all communities) 313 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 314 discoverService := discover.NewDiscoverService(discoverRepo) 315 log.Println("✅ Discover service initialized") 316 317 // Start Jetstream consumer for posts 318 // This consumer indexes posts created in community repositories via the firehose 319 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 320 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 321 if postJetstreamURL == "" { 322 // Listen to post record creation events 323 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 324 } 325 326 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db) 327 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 328 329 go func() { 330 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 331 log.Printf("Post Jetstream consumer stopped: %v", startErr) 332 } 333 }() 334 335 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 336 log.Println(" - Indexing: social.coves.community.post CREATE operations") 337 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 338 339 // Start Jetstream consumer for aggregators 340 // This consumer indexes aggregator service declarations and authorization records 341 // Following Bluesky's pattern for feed generators and labelers 342 // NOTE: Uses the same Jetstream as communities, just filtering different collections 343 aggregatorJetstreamURL := communityJetstreamURL 344 // Override if specific URL needed for testing 345 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 346 aggregatorJetstreamURL = envURL 347 } else if aggregatorJetstreamURL == "" { 348 // Fallback if community URL also not set 349 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 350 } 351 352 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 353 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 354 355 go func() { 356 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 357 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 358 } 359 }() 360 361 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 362 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 363 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 364 365 // Start Jetstream consumer for votes 366 // This consumer indexes votes from user repositories and updates post vote counts 367 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 368 if voteJetstreamURL == "" { 369 // Listen to vote record CREATE/DELETE events from user repositories 370 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 371 } 372 373 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 374 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 375 376 go func() { 377 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 378 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 379 } 380 }() 381 382 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 383 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 384 log.Println(" - Updating: Post vote counts atomically") 385 386 // Start Jetstream consumer for comments 387 // This consumer indexes comments from user repositories and updates parent counts 388 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 389 if commentJetstreamURL == "" { 390 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 391 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment" 392 } 393 394 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 395 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 396 397 go func() { 398 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 399 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 400 } 401 }() 402 403 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 404 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations") 405 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 406 407 // Register XRPC routes 408 routes.RegisterUserRoutes(r, userService) 409 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 410 log.Println("Community XRPC endpoints registered with OAuth authentication") 411 412 routes.RegisterPostRoutes(r, postService, authMiddleware) 413 log.Println("Post XRPC endpoints registered with OAuth authentication") 414 415 // Vote write endpoints removed - clients write directly to their PDS 416 // The AppView indexes votes from Jetstream (see vote consumer above) 417 418 routes.RegisterCommunityFeedRoutes(r, feedService) 419 log.Println("Feed XRPC endpoints registered (public, no auth required)") 420 421 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 422 log.Println("Timeline XRPC endpoints registered (requires authentication)") 423 424 routes.RegisterDiscoverRoutes(r, discoverService) 425 log.Println("Discover XRPC endpoints registered (public, no auth required)") 426 427 routes.RegisterAggregatorRoutes(r, aggregatorService) 428 log.Println("Aggregator XRPC endpoints registered (query endpoints public)") 429 430 // Comment query API - supports optional authentication for viewer state 431 // Stricter rate limiting for expensive nested comment queries 432 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute) 433 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 434 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 435 r.Handle( 436 "/xrpc/social.coves.community.comment.getComments", 437 commentRateLimiter.Middleware( 438 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 439 ), 440 ) 441 log.Println("✅ Comment query API registered (20 req/min rate limit)") 442 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 443 444 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 445 w.WriteHeader(http.StatusOK) 446 if _, err := w.Write([]byte("OK")); err != nil { 447 log.Printf("Failed to write health check response: %v", err) 448 } 449 }) 450 451 port := os.Getenv("APPVIEW_PORT") 452 if port == "" { 453 port = "8081" // Match .env.dev default 454 } 455 456 fmt.Printf("Coves AppView starting on port %s\n", port) 457 fmt.Printf("Default PDS: %s\n", defaultPDS) 458 log.Fatal(http.ListenAndServe(":"+port, r)) 459} 460 461// authenticateWithPDS creates a session on the PDS and returns an access token 462func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 463 type CreateSessionRequest struct { 464 Identifier string `json:"identifier"` 465 Password string `json:"password"` 466 } 467 468 type CreateSessionResponse struct { 469 DID string `json:"did"` 470 Handle string `json:"handle"` 471 AccessJwt string `json:"accessJwt"` 472 } 473 474 reqBody, err := json.Marshal(CreateSessionRequest{ 475 Identifier: handle, 476 Password: password, 477 }) 478 if err != nil { 479 return "", fmt.Errorf("failed to marshal request: %w", err) 480 } 481 482 resp, err := http.Post( 483 pdsURL+"/xrpc/com.atproto.server.createSession", 484 "application/json", 485 bytes.NewReader(reqBody), 486 ) 487 if err != nil { 488 return "", fmt.Errorf("failed to call PDS: %w", err) 489 } 490 defer func() { 491 if closeErr := resp.Body.Close(); closeErr != nil { 492 log.Printf("Failed to close response body: %v", closeErr) 493 } 494 }() 495 496 if resp.StatusCode != http.StatusOK { 497 body, readErr := io.ReadAll(resp.Body) 498 if readErr != nil { 499 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 500 } 501 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 502 } 503 504 var session CreateSessionResponse 505 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 506 return "", fmt.Errorf("failed to decode response: %w", err) 507 } 508 509 return session.AccessJwt, nil 510}