A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/communityFeeds" 12 "Coves/internal/core/discover" 13 "Coves/internal/core/posts" 14 "Coves/internal/core/timeline" 15 "Coves/internal/core/users" 16 "bytes" 17 "context" 18 "database/sql" 19 "encoding/json" 20 "fmt" 21 "io" 22 "log" 23 "net/http" 24 "os" 25 "strings" 26 "time" 27 28 "github.com/go-chi/chi/v5" 29 chiMiddleware "github.com/go-chi/chi/v5/middleware" 30 _ "github.com/lib/pq" 31 "github.com/pressly/goose/v3" 32 33 postgresRepo "Coves/internal/db/postgres" 34) 35 36func main() { 37 // Database configuration (AppView database) 38 dbURL := os.Getenv("DATABASE_URL") 39 if dbURL == "" { 40 // Use dev database from .env.dev 41 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 42 } 43 44 // Default PDS URL for this Coves instance (supports self-hosting) 45 defaultPDS := os.Getenv("PDS_URL") 46 if defaultPDS == "" { 47 defaultPDS = "http://localhost:3001" // Local dev PDS 48 } 49 50 // Cursor secret for HMAC signing (prevents cursor manipulation) 51 cursorSecret := os.Getenv("CURSOR_SECRET") 52 if cursorSecret == "" { 53 // Generate a random secret if not set (dev mode) 54 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 55 cursorSecret = "dev-cursor-secret-change-in-production" 56 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 57 } 58 59 db, err := sql.Open("postgres", dbURL) 60 if err != nil { 61 log.Fatal("Failed to connect to database:", err) 62 } 63 defer func() { 64 if closeErr := db.Close(); closeErr != nil { 65 log.Printf("Failed to close database connection: %v", closeErr) 66 } 67 }() 68 69 if err = db.Ping(); err != nil { 70 log.Fatal("Failed to ping database:", err) 71 } 72 73 log.Println("Connected to AppView database") 74 75 // Run migrations 76 if err = goose.SetDialect("postgres"); err != nil { 77 log.Fatal("Failed to set goose dialect:", err) 78 } 79 80 if err = goose.Up(db, "internal/db/migrations"); err != nil { 81 log.Fatal("Failed to run migrations:", err) 82 } 83 84 log.Println("Migrations completed successfully") 85 86 r := chi.NewRouter() 87 88 r.Use(chiMiddleware.Logger) 89 r.Use(chiMiddleware.Recoverer) 90 r.Use(chiMiddleware.RequestID) 91 92 // Rate limiting: 100 requests per minute per IP 93 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 94 r.Use(rateLimiter.Middleware) 95 96 // Initialize identity resolver 97 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 98 // directory as DID registration to ensure E2E tests work without hitting 99 // the production plc.directory 100 identityConfig := identity.DefaultConfig() 101 102 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 103 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 104 if plcDirectoryURL == "" { 105 plcDirectoryURL = "https://plc.directory" // Default to production PLC 106 } 107 108 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 109 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 110 if isDevEnv { 111 identityConfig.PLCURL = plcDirectoryURL 112 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 113 } else { 114 // Production: Allow separate IDENTITY_PLC_URL for read operations 115 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 116 identityConfig.PLCURL = identityPLCURL 117 } else { 118 identityConfig.PLCURL = plcDirectoryURL 119 } 120 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 121 } 122 123 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 124 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 125 identityConfig.CacheTTL = duration 126 } 127 } 128 129 identityResolver := identity.NewResolver(db, identityConfig) 130 131 // Initialize atProto auth middleware for JWT validation 132 // Phase 1: Set skipVerify=true to test JWT parsing only 133 // Phase 2: Set skipVerify=false to enable full signature verification 134 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 135 if skipVerify { 136 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 137 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 138 } 139 140 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 141 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 142 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 143 log.Println("✅ atProto auth middleware initialized") 144 145 // Initialize repositories and services 146 userRepo := postgresRepo.NewUserRepository(db) 147 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 148 149 communityRepo := postgresRepo.NewCommunityRepository(db) 150 151 // V2.0: PDS-managed DID generation 152 // Community DIDs and keys are generated entirely by the PDS 153 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 154 155 instanceDID := os.Getenv("INSTANCE_DID") 156 if instanceDID == "" { 157 instanceDID = "did:web:coves.social" // Default for development 158 } 159 160 // V2: Extract instance domain for community handles 161 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 162 // We cannot allow arbitrary domains to prevent impersonation attacks 163 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 164 // 165 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 166 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 167 // Communities with mismatched hostedBy domains are rejected during indexing 168 var instanceDomain string 169 if strings.HasPrefix(instanceDID, "did:web:") { 170 // Extract domain from did:web (this is the authoritative source) 171 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 172 } else { 173 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 174 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 175 if instanceDomain == "" { 176 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 177 } 178 } 179 180 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 181 182 // V2.0: Initialize PDS account provisioner for communities (simplified) 183 // PDS handles all DID and key generation - no Coves-side cryptography needed 184 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 185 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 186 log.Printf(" - Communities will be created at: %s", defaultPDS) 187 log.Printf(" - PDS will generate and manage all DIDs and keys") 188 189 // Initialize community service (no longer needs didGenerator directly) 190 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 191 192 // Authenticate Coves instance with PDS to enable community record writes 193 // The instance needs a PDS account to write community records it owns 194 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 195 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 196 if pdsHandle != "" && pdsPassword != "" { 197 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 198 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 199 if authErr != nil { 200 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 201 log.Println("Community creation will fail until PDS authentication is configured") 202 } else { 203 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 204 svc.SetPDSAccessToken(accessToken) 205 log.Println("✓ Coves instance authenticated with PDS") 206 } 207 } 208 } else { 209 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 210 log.Println("Community creation via write-forward is disabled") 211 } 212 213 // Start Jetstream consumer for read-forward user indexing 214 jetstreamURL := os.Getenv("JETSTREAM_URL") 215 if jetstreamURL == "" { 216 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 217 } 218 219 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 220 221 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 222 ctx := context.Background() 223 go func() { 224 if startErr := userConsumer.Start(ctx); startErr != nil { 225 log.Printf("Jetstream consumer stopped: %v", startErr) 226 } 227 }() 228 229 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 230 231 // Start Jetstream consumer for community events (profiles and subscriptions) 232 // This consumer indexes: 233 // 1. Community profiles (social.coves.community.profile) - in community's own repo 234 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 235 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 236 if communityJetstreamURL == "" { 237 // Local Jetstream for communities - filter to our instance's collections 238 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 239 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 240 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 241 } 242 243 // Initialize community event consumer with did:web verification 244 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 245 if skipDIDWebVerification { 246 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 247 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 248 } 249 250 // Pass identity resolver to consumer for PLC handle resolution (source of truth) 251 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver) 252 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 253 254 go func() { 255 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 256 log.Printf("Community Jetstream consumer stopped: %v", startErr) 257 } 258 }() 259 260 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 261 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 262 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 263 264 // Start JWKS cache cleanup background job 265 go func() { 266 ticker := time.NewTicker(1 * time.Hour) 267 defer ticker.Stop() 268 for range ticker.C { 269 jwksFetcher.CleanupExpiredCache() 270 log.Println("JWKS cache cleanup completed") 271 } 272 }() 273 274 log.Println("Started JWKS cache cleanup background job (runs hourly)") 275 276 // Initialize aggregator service 277 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 278 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 279 log.Println("✅ Aggregator service initialized") 280 281 // Initialize post service (with aggregator support) 282 postRepo := postgresRepo.NewPostRepository(db) 283 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS) 284 285 // Initialize vote repository (used by Jetstream consumer for indexing) 286 voteRepo := postgresRepo.NewVoteRepository(db) 287 log.Println("✅ Vote repository initialized (Jetstream indexing only)") 288 289 // Initialize comment repository (used by Jetstream consumer for indexing) 290 commentRepo := postgresRepo.NewCommentRepository(db) 291 log.Println("✅ Comment repository initialized (Jetstream indexing only)") 292 293 // Initialize feed service 294 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 295 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 296 log.Println("✅ Feed service initialized") 297 298 // Initialize timeline service (home feed from subscribed communities) 299 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 300 timelineService := timeline.NewTimelineService(timelineRepo) 301 log.Println("✅ Timeline service initialized") 302 303 // Initialize discover service (public feed from all communities) 304 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 305 discoverService := discover.NewDiscoverService(discoverRepo) 306 log.Println("✅ Discover service initialized") 307 308 // Start Jetstream consumer for posts 309 // This consumer indexes posts created in community repositories via the firehose 310 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 311 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 312 if postJetstreamURL == "" { 313 // Listen to post record creation events 314 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post" 315 } 316 317 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 318 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 319 320 go func() { 321 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 322 log.Printf("Post Jetstream consumer stopped: %v", startErr) 323 } 324 }() 325 326 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 327 log.Println(" - Indexing: social.coves.community.post CREATE operations") 328 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 329 330 // Start Jetstream consumer for aggregators 331 // This consumer indexes aggregator service declarations and authorization records 332 // Following Bluesky's pattern for feed generators and labelers 333 // NOTE: Uses the same Jetstream as communities, just filtering different collections 334 aggregatorJetstreamURL := communityJetstreamURL 335 // Override if specific URL needed for testing 336 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 337 aggregatorJetstreamURL = envURL 338 } else if aggregatorJetstreamURL == "" { 339 // Fallback if community URL also not set 340 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 341 } 342 343 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 344 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 345 346 go func() { 347 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 348 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 349 } 350 }() 351 352 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 353 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 354 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 355 356 // Start Jetstream consumer for votes 357 // This consumer indexes votes from user repositories and updates post vote counts 358 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 359 if voteJetstreamURL == "" { 360 // Listen to vote record CREATE/DELETE events from user repositories 361 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote" 362 } 363 364 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 365 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 366 367 go func() { 368 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 369 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 370 } 371 }() 372 373 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 374 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations") 375 log.Println(" - Updating: Post vote counts atomically") 376 377 // Start Jetstream consumer for comments 378 // This consumer indexes comments from user repositories and updates parent counts 379 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL") 380 if commentJetstreamURL == "" { 381 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories 382 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment" 383 } 384 385 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 386 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL) 387 388 go func() { 389 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil { 390 log.Printf("Comment Jetstream consumer stopped: %v", startErr) 391 } 392 }() 393 394 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL) 395 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations") 396 log.Println(" - Updating: Post comment counts and comment reply counts atomically") 397 398 // Register XRPC routes 399 routes.RegisterUserRoutes(r, userService) 400 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 401 log.Println("Community XRPC endpoints registered with OAuth authentication") 402 403 routes.RegisterPostRoutes(r, postService, authMiddleware) 404 log.Println("Post XRPC endpoints registered with OAuth authentication") 405 406 // Vote write endpoints removed - clients write directly to their PDS 407 // The AppView indexes votes from Jetstream (see vote consumer above) 408 409 routes.RegisterCommunityFeedRoutes(r, feedService) 410 log.Println("Feed XRPC endpoints registered (public, no auth required)") 411 412 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 413 log.Println("Timeline XRPC endpoints registered (requires authentication)") 414 415 routes.RegisterDiscoverRoutes(r, discoverService) 416 log.Println("Discover XRPC endpoints registered (public, no auth required)") 417 418 routes.RegisterAggregatorRoutes(r, aggregatorService) 419 log.Println("Aggregator XRPC endpoints registered (query endpoints public)") 420 421 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 422 w.WriteHeader(http.StatusOK) 423 if _, err := w.Write([]byte("OK")); err != nil { 424 log.Printf("Failed to write health check response: %v", err) 425 } 426 }) 427 428 port := os.Getenv("APPVIEW_PORT") 429 if port == "" { 430 port = "8081" // Match .env.dev default 431 } 432 433 fmt.Printf("Coves AppView starting on port %s\n", port) 434 fmt.Printf("Default PDS: %s\n", defaultPDS) 435 log.Fatal(http.ListenAndServe(":"+port, r)) 436} 437 438// authenticateWithPDS creates a session on the PDS and returns an access token 439func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 440 type CreateSessionRequest struct { 441 Identifier string `json:"identifier"` 442 Password string `json:"password"` 443 } 444 445 type CreateSessionResponse struct { 446 DID string `json:"did"` 447 Handle string `json:"handle"` 448 AccessJwt string `json:"accessJwt"` 449 } 450 451 reqBody, err := json.Marshal(CreateSessionRequest{ 452 Identifier: handle, 453 Password: password, 454 }) 455 if err != nil { 456 return "", fmt.Errorf("failed to marshal request: %w", err) 457 } 458 459 resp, err := http.Post( 460 pdsURL+"/xrpc/com.atproto.server.createSession", 461 "application/json", 462 bytes.NewReader(reqBody), 463 ) 464 if err != nil { 465 return "", fmt.Errorf("failed to call PDS: %w", err) 466 } 467 defer func() { 468 if closeErr := resp.Body.Close(); closeErr != nil { 469 log.Printf("Failed to close response body: %v", closeErr) 470 } 471 }() 472 473 if resp.StatusCode != http.StatusOK { 474 body, readErr := io.ReadAll(resp.Body) 475 if readErr != nil { 476 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 477 } 478 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 479 } 480 481 var session CreateSessionResponse 482 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 483 return "", fmt.Errorf("failed to decode response: %w", err) 484 } 485 486 return session.AccessJwt, nil 487}