A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/aggregators" 10 "Coves/internal/core/communities" 11 "Coves/internal/core/communityFeeds" 12 "Coves/internal/core/posts" 13 "Coves/internal/core/users" 14 "bytes" 15 "context" 16 "database/sql" 17 "encoding/json" 18 "fmt" 19 "io" 20 "log" 21 "net/http" 22 "os" 23 "strings" 24 "time" 25 26 "github.com/go-chi/chi/v5" 27 chiMiddleware "github.com/go-chi/chi/v5/middleware" 28 _ "github.com/lib/pq" 29 "github.com/pressly/goose/v3" 30 31 postgresRepo "Coves/internal/db/postgres" 32) 33 34func main() { 35 // Database configuration (AppView database) 36 dbURL := os.Getenv("DATABASE_URL") 37 if dbURL == "" { 38 // Use dev database from .env.dev 39 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 40 } 41 42 // Default PDS URL for this Coves instance (supports self-hosting) 43 defaultPDS := os.Getenv("PDS_URL") 44 if defaultPDS == "" { 45 defaultPDS = "http://localhost:3001" // Local dev PDS 46 } 47 48 db, err := sql.Open("postgres", dbURL) 49 if err != nil { 50 log.Fatal("Failed to connect to database:", err) 51 } 52 defer func() { 53 if closeErr := db.Close(); closeErr != nil { 54 log.Printf("Failed to close database connection: %v", closeErr) 55 } 56 }() 57 58 if err = db.Ping(); err != nil { 59 log.Fatal("Failed to ping database:", err) 60 } 61 62 log.Println("Connected to AppView database") 63 64 // Run migrations 65 if err = goose.SetDialect("postgres"); err != nil { 66 log.Fatal("Failed to set goose dialect:", err) 67 } 68 69 if err = goose.Up(db, "internal/db/migrations"); err != nil { 70 log.Fatal("Failed to run migrations:", err) 71 } 72 73 log.Println("Migrations completed successfully") 74 75 r := chi.NewRouter() 76 77 r.Use(chiMiddleware.Logger) 78 r.Use(chiMiddleware.Recoverer) 79 r.Use(chiMiddleware.RequestID) 80 81 // Rate limiting: 100 requests per minute per IP 82 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 83 r.Use(rateLimiter.Middleware) 84 85 // Initialize identity resolver 86 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 87 // directory as DID registration to ensure E2E tests work without hitting 88 // the production plc.directory 89 identityConfig := identity.DefaultConfig() 90 91 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 92 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 93 if plcDirectoryURL == "" { 94 plcDirectoryURL = "https://plc.directory" // Default to production PLC 95 } 96 97 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 98 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 99 if isDevEnv { 100 identityConfig.PLCURL = plcDirectoryURL 101 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 102 } else { 103 // Production: Allow separate IDENTITY_PLC_URL for read operations 104 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 105 identityConfig.PLCURL = identityPLCURL 106 } else { 107 identityConfig.PLCURL = plcDirectoryURL 108 } 109 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 110 } 111 112 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 113 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 114 identityConfig.CacheTTL = duration 115 } 116 } 117 118 identityResolver := identity.NewResolver(db, identityConfig) 119 120 // Initialize atProto auth middleware for JWT validation 121 // Phase 1: Set skipVerify=true to test JWT parsing only 122 // Phase 2: Set skipVerify=false to enable full signature verification 123 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 124 if skipVerify { 125 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 126 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 127 } 128 129 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 130 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 131 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 132 log.Println("✅ atProto auth middleware initialized") 133 134 // Initialize repositories and services 135 userRepo := postgresRepo.NewUserRepository(db) 136 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 137 138 communityRepo := postgresRepo.NewCommunityRepository(db) 139 140 // V2.0: PDS-managed DID generation 141 // Community DIDs and keys are generated entirely by the PDS 142 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 143 144 instanceDID := os.Getenv("INSTANCE_DID") 145 if instanceDID == "" { 146 instanceDID = "did:web:coves.social" // Default for development 147 } 148 149 // V2: Extract instance domain for community handles 150 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 151 // We cannot allow arbitrary domains to prevent impersonation attacks 152 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 153 // 154 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 155 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 156 // Communities with mismatched hostedBy domains are rejected during indexing 157 var instanceDomain string 158 if strings.HasPrefix(instanceDID, "did:web:") { 159 // Extract domain from did:web (this is the authoritative source) 160 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 161 } else { 162 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 163 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 164 if instanceDomain == "" { 165 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 166 } 167 } 168 169 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 170 171 // V2.0: Initialize PDS account provisioner for communities (simplified) 172 // PDS handles all DID and key generation - no Coves-side cryptography needed 173 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 174 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 175 log.Printf(" - Communities will be created at: %s", defaultPDS) 176 log.Printf(" - PDS will generate and manage all DIDs and keys") 177 178 // Initialize community service (no longer needs didGenerator directly) 179 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 180 181 // Authenticate Coves instance with PDS to enable community record writes 182 // The instance needs a PDS account to write community records it owns 183 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 184 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 185 if pdsHandle != "" && pdsPassword != "" { 186 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 187 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 188 if authErr != nil { 189 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 190 log.Println("Community creation will fail until PDS authentication is configured") 191 } else { 192 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 193 svc.SetPDSAccessToken(accessToken) 194 log.Println("✓ Coves instance authenticated with PDS") 195 } 196 } 197 } else { 198 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 199 log.Println("Community creation via write-forward is disabled") 200 } 201 202 // Start Jetstream consumer for read-forward user indexing 203 jetstreamURL := os.Getenv("JETSTREAM_URL") 204 if jetstreamURL == "" { 205 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 206 } 207 208 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 209 210 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 211 ctx := context.Background() 212 go func() { 213 if startErr := userConsumer.Start(ctx); startErr != nil { 214 log.Printf("Jetstream consumer stopped: %v", startErr) 215 } 216 }() 217 218 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 219 220 // Start Jetstream consumer for community events (profiles and subscriptions) 221 // This consumer indexes: 222 // 1. Community profiles (social.coves.community.profile) - in community's own repo 223 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 224 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 225 if communityJetstreamURL == "" { 226 // Local Jetstream for communities - filter to our instance's collections 227 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 228 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 229 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 230 } 231 232 // Initialize community event consumer with did:web verification 233 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 234 if skipDIDWebVerification { 235 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 236 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 237 } 238 239 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification) 240 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 241 242 go func() { 243 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 244 log.Printf("Community Jetstream consumer stopped: %v", startErr) 245 } 246 }() 247 248 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 249 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 250 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 251 252 // Start JWKS cache cleanup background job 253 go func() { 254 ticker := time.NewTicker(1 * time.Hour) 255 defer ticker.Stop() 256 for range ticker.C { 257 jwksFetcher.CleanupExpiredCache() 258 log.Println("JWKS cache cleanup completed") 259 } 260 }() 261 262 log.Println("Started JWKS cache cleanup background job (runs hourly)") 263 264 // Initialize aggregator service 265 aggregatorRepo := postgresRepo.NewAggregatorRepository(db) 266 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService) 267 log.Println("✅ Aggregator service initialized") 268 269 // Initialize post service (with aggregator support) 270 postRepo := postgresRepo.NewPostRepository(db) 271 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS) 272 273 // Initialize feed service 274 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 275 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 276 log.Println("✅ Feed service initialized") 277 278 // Start Jetstream consumer for posts 279 // This consumer indexes posts created in community repositories via the firehose 280 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 281 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 282 if postJetstreamURL == "" { 283 // Listen to post record creation events 284 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record" 285 } 286 287 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 288 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 289 290 go func() { 291 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 292 log.Printf("Post Jetstream consumer stopped: %v", startErr) 293 } 294 }() 295 296 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 297 log.Println(" - Indexing: social.coves.post.record CREATE operations") 298 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 299 300 // Start Jetstream consumer for aggregators 301 // This consumer indexes aggregator service declarations and authorization records 302 // Following Bluesky's pattern for feed generators and labelers 303 // NOTE: Uses the same Jetstream as communities, just filtering different collections 304 aggregatorJetstreamURL := communityJetstreamURL 305 // Override if specific URL needed for testing 306 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" { 307 aggregatorJetstreamURL = envURL 308 } else if aggregatorJetstreamURL == "" { 309 // Fallback if community URL also not set 310 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization" 311 } 312 313 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo) 314 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL) 315 316 go func() { 317 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil { 318 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr) 319 } 320 }() 321 322 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL) 323 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)") 324 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)") 325 326 // Register XRPC routes 327 routes.RegisterUserRoutes(r, userService) 328 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 329 log.Println("Community XRPC endpoints registered with OAuth authentication") 330 331 routes.RegisterPostRoutes(r, postService, authMiddleware) 332 log.Println("Post XRPC endpoints registered with OAuth authentication") 333 334 routes.RegisterCommunityFeedRoutes(r, feedService) 335 log.Println("Feed XRPC endpoints registered (public, no auth required)") 336 337 routes.RegisterAggregatorRoutes(r, aggregatorService) 338 log.Println("Aggregator XRPC endpoints registered (query endpoints public)") 339 340 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 341 w.WriteHeader(http.StatusOK) 342 if _, err := w.Write([]byte("OK")); err != nil { 343 log.Printf("Failed to write health check response: %v", err) 344 } 345 }) 346 347 port := os.Getenv("APPVIEW_PORT") 348 if port == "" { 349 port = "8081" // Match .env.dev default 350 } 351 352 fmt.Printf("Coves AppView starting on port %s\n", port) 353 fmt.Printf("Default PDS: %s\n", defaultPDS) 354 log.Fatal(http.ListenAndServe(":"+port, r)) 355} 356 357// authenticateWithPDS creates a session on the PDS and returns an access token 358func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 359 type CreateSessionRequest struct { 360 Identifier string `json:"identifier"` 361 Password string `json:"password"` 362 } 363 364 type CreateSessionResponse struct { 365 DID string `json:"did"` 366 Handle string `json:"handle"` 367 AccessJwt string `json:"accessJwt"` 368 } 369 370 reqBody, err := json.Marshal(CreateSessionRequest{ 371 Identifier: handle, 372 Password: password, 373 }) 374 if err != nil { 375 return "", fmt.Errorf("failed to marshal request: %w", err) 376 } 377 378 resp, err := http.Post( 379 pdsURL+"/xrpc/com.atproto.server.createSession", 380 "application/json", 381 bytes.NewReader(reqBody), 382 ) 383 if err != nil { 384 return "", fmt.Errorf("failed to call PDS: %w", err) 385 } 386 defer func() { 387 if closeErr := resp.Body.Close(); closeErr != nil { 388 log.Printf("Failed to close response body: %v", closeErr) 389 } 390 }() 391 392 if resp.StatusCode != http.StatusOK { 393 body, readErr := io.ReadAll(resp.Body) 394 if readErr != nil { 395 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 396 } 397 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 398 } 399 400 var session CreateSessionResponse 401 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 402 return "", fmt.Errorf("failed to decode response: %w", err) 403 } 404 405 return session.AccessJwt, nil 406}