A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/communityFeeds" 12 "Coves/internal/core/discover" 13 "Coves/internal/core/posts" 14 "Coves/internal/core/timeline" 15 "Coves/internal/core/users" 16 "Coves/internal/core/votes" 17 "bytes" 18 "context" 19 "database/sql" 20 "encoding/json" 21 "fmt" 22 "io" 23 "log" 24 "net/http" 25 "os" 26 "strings" 27 "time" 28 29 "github.com/go-chi/chi/v5" 30 chiMiddleware "github.com/go-chi/chi/v5/middleware" 31 _ "github.com/lib/pq" 32 "github.com/pressly/goose/v3" 33 34 postgresRepo "Coves/internal/db/postgres" 35) 36 37func main() { 38 // Database configuration (AppView database) 39 dbURL := os.Getenv("DATABASE_URL") 40 if dbURL == "" { 41 // Use dev database from .env.dev 42 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 43 } 44 45 // Default PDS URL for this Coves instance (supports self-hosting) 46 defaultPDS := os.Getenv("PDS_URL") 47 if defaultPDS == "" { 48 defaultPDS = "http://localhost:3001" // Local dev PDS 49 } 50 51 // Cursor secret for HMAC signing (prevents cursor manipulation) 52 cursorSecret := os.Getenv("CURSOR_SECRET") 53 if cursorSecret == "" { 54 // Generate a random secret if not set (dev mode) 55 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value 56 cursorSecret = "dev-cursor-secret-change-in-production" 57 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!") 58 } 59 60 db, err := sql.Open("postgres", dbURL) 61 if err != nil { 62 log.Fatal("Failed to connect to database:", err) 63 } 64 defer func() { 65 if closeErr := db.Close(); closeErr != nil { 66 log.Printf("Failed to close database connection: %v", closeErr) 67 } 68 }() 69 70 if err = db.Ping(); err != nil { 71 log.Fatal("Failed to ping database:", err) 72 } 73 74 log.Println("Connected to AppView database") 75 76 // Run migrations 77 if err = goose.SetDialect("postgres"); err != nil { 78 log.Fatal("Failed to set goose dialect:", err) 79 } 80 81 if err = goose.Up(db, "internal/db/migrations"); err != nil { 82 log.Fatal("Failed to run migrations:", err) 83 } 84 85 log.Println("Migrations completed successfully") 86 87 r := chi.NewRouter() 88 89 r.Use(chiMiddleware.Logger) 90 r.Use(chiMiddleware.Recoverer) 91 r.Use(chiMiddleware.RequestID) 92 93 // Rate limiting: 100 requests per minute per IP 94 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 95 r.Use(rateLimiter.Middleware) 96 97 // Initialize identity resolver 98 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 99 // directory as DID registration to ensure E2E tests work without hitting 100 // the production plc.directory 101 identityConfig := identity.DefaultConfig() 102 103 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 104 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 105 if plcDirectoryURL == "" { 106 plcDirectoryURL = "https://plc.directory" // Default to production PLC 107 } 108 109 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 110 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 111 if isDevEnv { 112 identityConfig.PLCURL = plcDirectoryURL 113 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 114 } else { 115 // Production: Allow separate IDENTITY_PLC_URL for read operations 116 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 117 identityConfig.PLCURL = identityPLCURL 118 } else { 119 identityConfig.PLCURL = plcDirectoryURL 120 } 121 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 122 } 123 124 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 125 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 126 identityConfig.CacheTTL = duration 127 } 128 } 129 130 identityResolver := identity.NewResolver(db, identityConfig) 131 132 // Initialize atProto auth middleware for JWT validation 133 // Phase 1: Set skipVerify=true to test JWT parsing only 134 // Phase 2: Set skipVerify=false to enable full signature verification 135 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 136 if skipVerify { 137 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 138 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 139 } 140 141 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 142 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 143 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 144 log.Println("✅ atProto auth middleware initialized") 145 146 // Initialize repositories and services 147 userRepo := postgresRepo.NewUserRepository(db) 148 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 149 150 communityRepo := postgresRepo.NewCommunityRepository(db) 151 152 // V2.0: PDS-managed DID generation 153 // Community DIDs and keys are generated entirely by the PDS 154 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 155 156 instanceDID := os.Getenv("INSTANCE_DID") 157 if instanceDID == "" { 158 instanceDID = "did:web:coves.social" // Default for development 159 } 160 161 // V2: Extract instance domain for community handles 162 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 163 // We cannot allow arbitrary domains to prevent impersonation attacks 164 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 165 // 166 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 167 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 168 // Communities with mismatched hostedBy domains are rejected during indexing 169 var instanceDomain string 170 if strings.HasPrefix(instanceDID, "did:web:") { 171 // Extract domain from did:web (this is the authoritative source) 172 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 173 } else { 174 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 175 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 176 if instanceDomain == "" { 177 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 178 } 179 } 180 181 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 182 183 // V2.0: Initialize PDS account provisioner for communities (simplified) 184 // PDS handles all DID and key generation - no Coves-side cryptography needed 185 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 186 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 187 log.Printf(" - Communities will be created at: %s", defaultPDS) 188 log.Printf(" - PDS will generate and manage all DIDs and keys") 189 190 // Initialize community service (no longer needs didGenerator directly) 191 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 192 193 // Authenticate Coves instance with PDS to enable community record writes 194 // The instance needs a PDS account to write community records it owns 195 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 196 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 197 if pdsHandle != "" && pdsPassword != "" { 198 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 199 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 200 if authErr != nil { 201 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 202 log.Println("Community creation will fail until PDS authentication is configured") 203 } else { 204 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 205 svc.SetPDSAccessToken(accessToken) 206 log.Println("✓ Coves instance authenticated with PDS") 207 } 208 } 209 } else { 210 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 211 log.Println("Community creation via write-forward is disabled") 212 } 213 214 // Start Jetstream consumer for read-forward user indexing 215 jetstreamURL := os.Getenv("JETSTREAM_URL") 216 if jetstreamURL == "" { 217 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 218 } 219 220 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 221 222 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 223 ctx := context.Background() 224 go func() { 225 if startErr := userConsumer.Start(ctx); startErr != nil { 226 log.Printf("Jetstream consumer stopped: %v", startErr) 227 } 228 }() 229 230 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 231 232 // Start Jetstream consumer for community events (profiles and subscriptions) 233 // This consumer indexes: 234 // 1. Community profiles (social.coves.community.profile) - in community's own repo 235 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 236 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 237 if communityJetstreamURL == "" { 238 // Local Jetstream for communities - filter to our instance's collections 239 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 240 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 241 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 242 } 243 244 // Initialize community event consumer with did:web verification 245 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 246 if skipDIDWebVerification { 247 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 248 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 249 } 250 251 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification) 252 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 253 254 go func() { 255 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 256 log.Printf("Community Jetstream consumer stopped: %v", startErr) 257 } 258 }() 259 260 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 261 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 262 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 263 264 // Start JWKS cache cleanup background job 265 go func() { 266 ticker := time.NewTicker(1 * time.Hour) 267 defer ticker.Stop() 268 for range ticker.C { 269 jwksFetcher.CleanupExpiredCache() 270 log.Println("JWKS cache cleanup completed") 271 } 272 }() 273 274 log.Println("Started JWKS cache cleanup background job (runs hourly)") 275 276 // Initialize aggregator service 277 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 278 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 279 log.Println("✅ Aggregator service initialized") 280 281 // Initialize post service (with aggregator support) 282 postRepo := postgresRepo.NewPostRepository(db) 283 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS) 284 285 // Initialize vote service 286 voteRepo := postgresRepo.NewVoteRepository(db) 287 voteService := votes.NewVoteService(voteRepo, postRepo, defaultPDS) 288 log.Println("✅ Vote service initialized") 289 290 // Initialize feed service 291 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 292 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 293 log.Println("✅ Feed service initialized") 294 295 // Initialize timeline service (home feed from subscribed communities) 296 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret) 297 timelineService := timeline.NewTimelineService(timelineRepo) 298 log.Println("✅ Timeline service initialized") 299 300 // Initialize discover service (public feed from all communities) 301 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret) 302 discoverService := discover.NewDiscoverService(discoverRepo) 303 log.Println("✅ Discover service initialized") 304 305 // Start Jetstream consumer for posts 306 // This consumer indexes posts created in community repositories via the firehose 307 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 308 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 309 if postJetstreamURL == "" { 310 // Listen to post record creation events 311 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record" 312 } 313 314 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 315 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 316 317 go func() { 318 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 319 log.Printf("Post Jetstream consumer stopped: %v", startErr) 320 } 321 }() 322 323 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 324 log.Println(" - Indexing: social.coves.post.record CREATE operations") 325 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 326 327 // Start Jetstream consumer for aggregators 328 // This consumer indexes aggregator service declarations and authorization records 329 // Following Bluesky's pattern for feed generators and labelers 330 // NOTE: Uses the same Jetstream as communities, just filtering different collections 331 aggregatorJetstreamURL := communityJetstreamURL 332 // Override if specific URL needed for testing 333 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 334 aggregatorJetstreamURL = envURL 335 } else if aggregatorJetstreamURL == "" { 336 // Fallback if community URL also not set 337 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 338 } 339 340 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 341 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 342 343 go func() { 344 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 345 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 346 } 347 }() 348 349 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 350 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 351 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 352 353 // Start Jetstream consumer for votes 354 // This consumer indexes votes from user repositories and updates post vote counts 355 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL") 356 if voteJetstreamURL == "" { 357 // Listen to vote record CREATE/DELETE events from user repositories 358 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.interaction.vote" 359 } 360 361 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db) 362 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL) 363 364 go func() { 365 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil { 366 log.Printf("Vote Jetstream consumer stopped: %v", startErr) 367 } 368 }() 369 370 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL) 371 log.Println(" - Indexing: social.coves.interaction.vote CREATE/DELETE operations") 372 log.Println(" - Updating: Post vote counts atomically") 373 374 // Register XRPC routes 375 routes.RegisterUserRoutes(r, userService) 376 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 377 log.Println("Community XRPC endpoints registered with OAuth authentication") 378 379 routes.RegisterPostRoutes(r, postService, authMiddleware) 380 log.Println("Post XRPC endpoints registered with OAuth authentication") 381 382 routes.RegisterVoteRoutes(r, voteService, authMiddleware) 383 log.Println("Vote XRPC endpoints registered with OAuth authentication") 384 385 routes.RegisterCommunityFeedRoutes(r, feedService) 386 log.Println("Feed XRPC endpoints registered (public, no auth required)") 387 388 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware) 389 log.Println("Timeline XRPC endpoints registered (requires authentication)") 390 391 routes.RegisterDiscoverRoutes(r, discoverService) 392 log.Println("Discover XRPC endpoints registered (public, no auth required)") 393 394 routes.RegisterAggregatorRoutes(r, aggregatorService) 395 log.Println("Aggregator XRPC endpoints registered (query endpoints public)") 396 397 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 398 w.WriteHeader(http.StatusOK) 399 if _, err := w.Write([]byte("OK")); err != nil { 400 log.Printf("Failed to write health check response: %v", err) 401 } 402 }) 403 404 port := os.Getenv("APPVIEW_PORT") 405 if port == "" { 406 port = "8081" // Match .env.dev default 407 } 408 409 fmt.Printf("Coves AppView starting on port %s\n", port) 410 fmt.Printf("Default PDS: %s\n", defaultPDS) 411 log.Fatal(http.ListenAndServe(":"+port, r)) 412} 413 414// authenticateWithPDS creates a session on the PDS and returns an access token 415func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 416 type CreateSessionRequest struct { 417 Identifier string `json:"identifier"` 418 Password string `json:"password"` 419 } 420 421 type CreateSessionResponse struct { 422 DID string `json:"did"` 423 Handle string `json:"handle"` 424 AccessJwt string `json:"accessJwt"` 425 } 426 427 reqBody, err := json.Marshal(CreateSessionRequest{ 428 Identifier: handle, 429 Password: password, 430 }) 431 if err != nil { 432 return "", fmt.Errorf("failed to marshal request: %w", err) 433 } 434 435 resp, err := http.Post( 436 pdsURL+"/xrpc/com.atproto.server.createSession", 437 "application/json", 438 bytes.NewReader(reqBody), 439 ) 440 if err != nil { 441 return "", fmt.Errorf("failed to call PDS: %w", err) 442 } 443 defer func() { 444 if closeErr := resp.Body.Close(); closeErr != nil { 445 log.Printf("Failed to close response body: %v", closeErr) 446 } 447 }() 448 449 if resp.StatusCode != http.StatusOK { 450 body, readErr := io.ReadAll(resp.Body) 451 if readErr != nil { 452 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 453 } 454 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 455 } 456 457 var session CreateSessionResponse 458 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 459 return "", fmt.Errorf("failed to decode response: %w", err) 460 } 461 462 return session.AccessJwt, nil 463}