A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/communityFeeds" 12 "Coves/internal/core/discover" 13 "Coves/internal/core/posts" 14 "Coves/internal/core/timeline" 15 "Coves/internal/core/users" 16 "bytes" 17 "context" 18 "database/sql" 19 "encoding/json" 20 "fmt" 21 "io" 22 "log" 23 "net/http" 24 "os" 25 "strings" 26 "time" 27 28 "github.com/go-chi/chi/v5" 29 chiMiddleware "github.com/go-chi/chi/v5/middleware" 30 _ "github.com/lib/pq" 31 "github.com/pressly/goose/v3" 32 33 commentsAPI "Coves/internal/api/handlers/comments" 34 "Coves/internal/core/comments" 35 postgresRepo "Coves/internal/db/postgres" 36) 37 38func main() { 39 // Database configuration (AppView database) 40 dbURL := os.Getenv("DATABASE_URL") 41 if dbURL == "" { 42 // Use dev database from .env.dev 43 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 44 } 45 46 // Default PDS URL for this Coves instance (supports self-hosting) 47 defaultPDS := os.Getenv("PDS_URL") 48 if defaultPDS == "" { 49 defaultPDS = "http://localhost:3001" // Local dev PDS 50 } 51 52 // Cursor secret for HMAC signing (prevents cursor manipulation) 53 cursorSecret := os.Getenv("CURSOR_SECRET") 54 if cursorSecret == "" { 55 // Generate a random secret if not set (dev mode) 56 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 57 cursorSecret = "dev-cursor-secret-change-in-production" 58 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 59 } 60 61 db, err := sql.Open("postgres", dbURL) 62 if err != nil { 63 log.Fatal("Failed to connect to database:", err) 64 } 65 defer func() { 66 if closeErr := db.Close(); closeErr != nil { 67 log.Printf("Failed to close database connection: %v", closeErr) 68 } 69 }() 70 71 if err = db.Ping(); err != nil { 72 log.Fatal("Failed to ping database:", err) 73 } 74 75 log.Println("Connected to AppView database") 76 77 // Run migrations 78 if err = goose.SetDialect("postgres"); err != nil { 79 log.Fatal("Failed to set goose dialect:", err) 80 } 81 82 if err = goose.Up(db, "internal/db/migrations"); err != nil { 83 log.Fatal("Failed to run migrations:", err) 84 } 85 86 log.Println("Migrations completed successfully") 87 88 r := chi.NewRouter() 89 90 r.Use(chiMiddleware.Logger) 91 r.Use(chiMiddleware.Recoverer) 92 r.Use(chiMiddleware.RequestID) 93 94 // Rate limiting: 100 requests per minute per IP 95 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 96 r.Use(rateLimiter.Middleware) 97 98 // Initialize identity resolver 99 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 100 // directory as DID registration to ensure E2E tests work without hitting 101 // the production plc.directory 102 identityConfig := identity.DefaultConfig() 103 104 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 105 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 106 if plcDirectoryURL == "" { 107 plcDirectoryURL = "https://plc.directory" // Default to production PLC 108 } 109 110 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 111 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 112 if isDevEnv { 113 identityConfig.PLCURL = plcDirectoryURL 114 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 115 } else { 116 // Production: Allow separate IDENTITY_PLC_URL for read operations 117 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 118 identityConfig.PLCURL = identityPLCURL 119 } else { 120 identityConfig.PLCURL = plcDirectoryURL 121 } 122 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 123 } 124 125 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 126 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 127 identityConfig.CacheTTL = duration 128 } 129 } 130 131 identityResolver := identity.NewResolver(db, identityConfig) 132 133 // Initialize atProto auth middleware for JWT validation 134 // Phase 1: Set skipVerify=true to test JWT parsing only 135 // Phase 2: Set skipVerify=false to enable full signature verification 136 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 137 if skipVerify { 138 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 139 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 140 } 141 142 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 143 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 144 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 145 log.Println("✅ atProto auth middleware initialized") 146 147 // Initialize repositories and services 148 userRepo := postgresRepo.NewUserRepository(db) 149 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 150 151 communityRepo := postgresRepo.NewCommunityRepository(db) 152 153 // V2.0: PDS-managed DID generation 154 // Community DIDs and keys are generated entirely by the PDS 155 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 156 157 instanceDID := os.Getenv("INSTANCE_DID") 158 if instanceDID == "" { 159 instanceDID = "did:web:coves.social" // Default for development 160 } 161 162 // V2: Extract instance domain for community handles 163 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 164 // We cannot allow arbitrary domains to prevent impersonation attacks 165 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 166 // 167 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 168 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 169 // Communities with mismatched hostedBy domains are rejected during indexing 170 var instanceDomain string 171 if strings.HasPrefix(instanceDID, "did:web:") { 172 // Extract domain from did:web (this is the authoritative source) 173 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 174 } else { 175 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 176 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 177 if instanceDomain == "" { 178 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 179 } 180 } 181 182 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 183 184 // V2.0: Initialize PDS account provisioner for communities (simplified) 185 // PDS handles all DID and key generation - no Coves-side cryptography needed 186 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 187 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 188 log.Printf(" - Communities will be created at: %s", defaultPDS) 189 log.Printf(" - PDS will generate and manage all DIDs and keys") 190 191 // Initialize community service (no longer needs didGenerator directly) 192 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 193 194 // Authenticate Coves instance with PDS to enable community record writes 195 // The instance needs a PDS account to write community records it owns 196 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 197 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 198 if pdsHandle != "" && pdsPassword != "" { 199 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 200 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 201 if authErr != nil { 202 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 203 log.Println("Community creation will fail until PDS authentication is configured") 204 } else { 205 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 206 svc.SetPDSAccessToken(accessToken) 207 log.Println("✓ Coves instance authenticated with PDS") 208 } 209 } 210 } else { 211 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 212 log.Println("Community creation via write-forward is disabled") 213 } 214 215 // Start Jetstream consumer for read-forward user indexing 216 jetstreamURL := os.Getenv("JETSTREAM_URL") 217 if jetstreamURL == "" { 218 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 219 } 220 221 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 222 223 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 224 ctx := context.Background() 225 go func() { 226 if startErr := userConsumer.Start(ctx); startErr != nil { 227 log.Printf("Jetstream consumer stopped: %v", startErr) 228 } 229 }() 230 231 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 232 233 // Start Jetstream consumer for community events (profiles and subscriptions) 234 // This consumer indexes: 235 // 1. Community profiles (social.coves.community.profile) - in community's own repo 236 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 237 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 238 if communityJetstreamURL == "" { 239 // Local Jetstream for communities - filter to our instance's collections 240 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 241 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 242 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 243 } 244 245 // Initialize community event consumer with did:web verification 246 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 247 if skipDIDWebVerification { 248 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 249 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 250 } 251 252 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 253 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 254 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 255 256 go func() { 257 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 258 log.Printf("Community Jetstream consumer stopped: %v", startErr) 259 } 260 }() 261 262 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 263 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 264 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 265 266 // Start JWKS cache cleanup background job 267 go func() { 268 ticker := time.NewTicker(1 * time.Hour) 269 defer ticker.Stop() 270 for range ticker.C { 271 jwksFetcher.CleanupExpiredCache() 272 log.Println("JWKS cache cleanup completed") 273 } 274 }() 275 276 log.Println("Started JWKS cache cleanup background job (runs hourly)") 277 278 // Initialize aggregator service 279 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 280 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 281 log.Println("✅ Aggregator service initialized") 282 283 // Initialize post service (with aggregator support) 284 postRepo := postgresRepo.NewPostRepository(db) 285 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS) 286 287 // Initialize vote repository (used by Jetstream consumer for indexing) 288 voteRepo := postgresRepo.NewVoteRepository(db) 289 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 290 291 // Initialize comment repository (used by Jetstream consumer for indexing) 292 commentRepo := postgresRepo.NewCommentRepository(db) 293 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 294 295 // Initialize comment service (for query API) 296 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo) 297 log.Println("✅ Comment service initialized") 298 299 // Initialize feed service 300 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 301 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 302 log.Println("✅ Feed service initialized") 303 304 // Initialize timeline service (home feed from subscribed communities) 305 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 306 timelineService := timeline.NewTimelineService(timelineRepo) 307 log.Println("✅ Timeline service initialized") 308 309 // Initialize discover service (public feed from all communities) 310 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 311 discoverService := discover.NewDiscoverService(discoverRepo) 312 log.Println("✅ Discover service initialized") 313 314 // Start Jetstream consumer for posts 315 // This consumer indexes posts created in community repositories via the firehose 316 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 317 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 318 if postJetstreamURL == "" { 319 // Listen to post record creation events 320 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 321 } 322 323 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 324 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 325 326 go func() { 327 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 328 log.Printf("Post Jetstream consumer stopped: %v", startErr) 329 } 330 }() 331 332 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 333 log.Println(" - Indexing: social.coves.community.post CREATE operations") 334 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 335 336 // Start Jetstream consumer for aggregators 337 // This consumer indexes aggregator service declarations and authorization records 338 // Following Bluesky's pattern for feed generators and labelers 339 // NOTE: Uses the same Jetstream as communities, just filtering different collections 340 aggregatorJetstreamURL := communityJetstreamURL 341 // Override if specific URL needed for testing 342 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 343 aggregatorJetstreamURL = envURL 344 } else if aggregatorJetstreamURL == "" { 345 // Fallback if community URL also not set 346 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 347 } 348 349 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 350 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 351 352 go func() { 353 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 354 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 355 } 356 }() 357 358 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 359 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 360 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 361 362 // Start Jetstream consumer for votes 363 // This consumer indexes votes from user repositories and updates post vote counts 364 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 365 if voteJetstreamURL == "" { 366 // Listen to vote record CREATE/DELETE events from user repositories 367 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 368 } 369 370 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 371 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 372 373 go func() { 374 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 375 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 376 } 377 }() 378 379 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 380 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 381 log.Println(" - Updating: Post vote counts atomically") 382 383 // Start Jetstream consumer for comments 384 // This consumer indexes comments from user repositories and updates parent counts 385 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 386 if commentJetstreamURL == "" { 387 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 388 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment" 389 } 390 391 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 392 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 393 394 go func() { 395 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 396 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 397 } 398 }() 399 400 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 401 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations") 402 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 403 404 // Register XRPC routes 405 routes.RegisterUserRoutes(r, userService) 406 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 407 log.Println("Community XRPC endpoints registered with OAuth authentication") 408 409 routes.RegisterPostRoutes(r, postService, authMiddleware) 410 log.Println("Post XRPC endpoints registered with OAuth authentication") 411 412 // Vote write endpoints removed - clients write directly to their PDS 413 // The AppView indexes votes from Jetstream (see vote consumer above) 414 415 routes.RegisterCommunityFeedRoutes(r, feedService) 416 log.Println("Feed XRPC endpoints registered (public, no auth required)") 417 418 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 419 log.Println("Timeline XRPC endpoints registered (requires authentication)") 420 421 routes.RegisterDiscoverRoutes(r, discoverService) 422 log.Println("Discover XRPC endpoints registered (public, no auth required)") 423 424 routes.RegisterAggregatorRoutes(r, aggregatorService) 425 log.Println("Aggregator XRPC endpoints registered (query endpoints public)") 426 427 // Comment query API - supports optional authentication for viewer state 428 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService) 429 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter) 430 r.Handle( 431 "/xrpc/social.coves.community.comment.getComments", 432 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments), 433 ) 434 log.Println("✅ Comment query API registered") 435 log.Println(" - GET /xrpc/social.coves.community.comment.getComments") 436 437 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 438 w.WriteHeader(http.StatusOK) 439 if _, err := w.Write([]byte("OK")); err != nil { 440 log.Printf("Failed to write health check response: %v", err) 441 } 442 }) 443 444 port := os.Getenv("APPVIEW_PORT") 445 if port == "" { 446 port = "8081" // Match .env.dev default 447 } 448 449 fmt.Printf("Coves AppView starting on port %s\n", port) 450 fmt.Printf("Default PDS: %s\n", defaultPDS) 451 log.Fatal(http.ListenAndServe(":"+port, r)) 452} 453 454// authenticateWithPDS creates a session on the PDS and returns an access token 455func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 456 type CreateSessionRequest struct { 457 Identifier string `json:"identifier"` 458 Password string `json:"password"` 459 } 460 461 type CreateSessionResponse struct { 462 DID string `json:"did"` 463 Handle string `json:"handle"` 464 AccessJwt string `json:"accessJwt"` 465 } 466 467 reqBody, err := json.Marshal(CreateSessionRequest{ 468 Identifier: handle, 469 Password: password, 470 }) 471 if err != nil { 472 return "", fmt.Errorf("failed to marshal request: %w", err) 473 } 474 475 resp, err := http.Post( 476 pdsURL+"/xrpc/com.atproto.server.createSession", 477 "application/json", 478 bytes.NewReader(reqBody), 479 ) 480 if err != nil { 481 return "", fmt.Errorf("failed to call PDS: %w", err) 482 } 483 defer func() { 484 if closeErr := resp.Body.Close(); closeErr != nil { 485 log.Printf("Failed to close response body: %v", closeErr) 486 } 487 }() 488 489 if resp.StatusCode != http.StatusOK { 490 body, readErr := io.ReadAll(resp.Body) 491 if readErr != nil { 492 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 493 } 494 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 495 } 496 497 var session CreateSessionResponse 498 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 499 return "", fmt.Errorf("failed to decode response: %w", err) 500 } 501 502 return session.AccessJwt, nil 503}