A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/api/routes" 6 "Coves/internal/atproto/auth" 7 "Coves/internal/atproto/identity" 8 "Coves/internal/atproto/jetstream" 9 "Coves/internal/core/communities" 10 "Coves/internal/core/communityFeeds" 11 "Coves/internal/core/posts" 12 "Coves/internal/core/users" 13 "bytes" 14 "context" 15 "database/sql" 16 "encoding/json" 17 "fmt" 18 "io" 19 "log" 20 "net/http" 21 "os" 22 "strings" 23 "time" 24 25 "github.com/go-chi/chi/v5" 26 chiMiddleware "github.com/go-chi/chi/v5/middleware" 27 _ "github.com/lib/pq" 28 "github.com/pressly/goose/v3" 29 30 postgresRepo "Coves/internal/db/postgres" 31) 32 33func main() { 34 // Database configuration (AppView database) 35 dbURL := os.Getenv("DATABASE_URL") 36 if dbURL == "" { 37 // Use dev database from .env.dev 38 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 39 } 40 41 // Default PDS URL for this Coves instance (supports self-hosting) 42 defaultPDS := os.Getenv("PDS_URL") 43 if defaultPDS == "" { 44 defaultPDS = "http://localhost:3001" // Local dev PDS 45 } 46 47 db, err := sql.Open("postgres", dbURL) 48 if err != nil { 49 log.Fatal("Failed to connect to database:", err) 50 } 51 defer func() { 52 if closeErr := db.Close(); closeErr != nil { 53 log.Printf("Failed to close database connection: %v", closeErr) 54 } 55 }() 56 57 if err = db.Ping(); err != nil { 58 log.Fatal("Failed to ping database:", err) 59 } 60 61 log.Println("Connected to AppView database") 62 63 // Run migrations 64 if err = goose.SetDialect("postgres"); err != nil { 65 log.Fatal("Failed to set goose dialect:", err) 66 } 67 68 if err = goose.Up(db, "internal/db/migrations"); err != nil { 69 log.Fatal("Failed to run migrations:", err) 70 } 71 72 log.Println("Migrations completed successfully") 73 74 r := chi.NewRouter() 75 76 r.Use(chiMiddleware.Logger) 77 r.Use(chiMiddleware.Recoverer) 78 r.Use(chiMiddleware.RequestID) 79 80 // Rate limiting: 100 requests per minute per IP 81 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 82 r.Use(rateLimiter.Middleware) 83 84 // Initialize identity resolver 85 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC 86 // directory as DID registration to ensure E2E tests work without hitting 87 // the production plc.directory 88 identityConfig := identity.DefaultConfig() 89 90 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 91 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 92 if plcDirectoryURL == "" { 93 plcDirectoryURL = "https://plc.directory" // Default to production PLC 94 } 95 96 // In dev mode, use PLC_DIRECTORY_URL for identity resolution 97 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL 98 if isDevEnv { 99 identityConfig.PLCURL = plcDirectoryURL 100 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL) 101 } else { 102 // Production: Allow separate IDENTITY_PLC_URL for read operations 103 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" { 104 identityConfig.PLCURL = identityPLCURL 105 } else { 106 identityConfig.PLCURL = plcDirectoryURL 107 } 108 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL) 109 } 110 111 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 112 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil { 113 identityConfig.CacheTTL = duration 114 } 115 } 116 117 identityResolver := identity.NewResolver(db, identityConfig) 118 119 // Initialize atProto auth middleware for JWT validation 120 // Phase 1: Set skipVerify=true to test JWT parsing only 121 // Phase 2: Set skipVerify=false to enable full signature verification 122 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true" 123 if skipVerify { 124 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)") 125 log.Println(" Set AUTH_SKIP_VERIFY=false for production") 126 } 127 128 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour 129 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL) 130 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify) 131 log.Println("✅ atProto auth middleware initialized") 132 133 // Initialize repositories and services 134 userRepo := postgresRepo.NewUserRepository(db) 135 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 136 137 communityRepo := postgresRepo.NewCommunityRepository(db) 138 139 // V2.0: PDS-managed DID generation 140 // Community DIDs and keys are generated entirely by the PDS 141 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach) 142 143 instanceDID := os.Getenv("INSTANCE_DID") 144 if instanceDID == "" { 145 instanceDID = "did:web:coves.social" // Default for development 146 } 147 148 // V2: Extract instance domain for community handles 149 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 150 // We cannot allow arbitrary domains to prevent impersonation attacks 151 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 152 // 153 // SECURITY: did:web domain verification is implemented in the Jetstream consumer 154 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim() 155 // Communities with mismatched hostedBy domains are rejected during indexing 156 var instanceDomain string 157 if strings.HasPrefix(instanceDID, "did:web:") { 158 // Extract domain from did:web (this is the authoritative source) 159 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 160 } else { 161 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 162 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 163 if instanceDomain == "" { 164 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 165 } 166 } 167 168 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 169 170 // V2.0: Initialize PDS account provisioner for communities (simplified) 171 // PDS handles all DID and key generation - no Coves-side cryptography needed 172 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS) 173 log.Printf("✅ Community provisioner initialized (PDS-managed keys)") 174 log.Printf(" - Communities will be created at: %s", defaultPDS) 175 log.Printf(" - PDS will generate and manage all DIDs and keys") 176 177 // Initialize community service (no longer needs didGenerator directly) 178 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner) 179 180 // Authenticate Coves instance with PDS to enable community record writes 181 // The instance needs a PDS account to write community records it owns 182 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 183 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 184 if pdsHandle != "" && pdsPassword != "" { 185 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 186 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 187 if authErr != nil { 188 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr) 189 log.Println("Community creation will fail until PDS authentication is configured") 190 } else { 191 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 192 svc.SetPDSAccessToken(accessToken) 193 log.Println("✓ Coves instance authenticated with PDS") 194 } 195 } 196 } else { 197 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 198 log.Println("Community creation via write-forward is disabled") 199 } 200 201 // Start Jetstream consumer for read-forward user indexing 202 jetstreamURL := os.Getenv("JETSTREAM_URL") 203 if jetstreamURL == "" { 204 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 205 } 206 207 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 208 209 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 210 ctx := context.Background() 211 go func() { 212 if startErr := userConsumer.Start(ctx); startErr != nil { 213 log.Printf("Jetstream consumer stopped: %v", startErr) 214 } 215 }() 216 217 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 218 219 // Start Jetstream consumer for community events (profiles and subscriptions) 220 // This consumer indexes: 221 // 1. Community profiles (social.coves.community.profile) - in community's own repo 222 // 2. User subscriptions (social.coves.community.subscription) - in user's repo 223 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL") 224 if communityJetstreamURL == "" { 225 // Local Jetstream for communities - filter to our instance's collections 226 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe) 227 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures 228 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription" 229 } 230 231 // Initialize community event consumer with did:web verification 232 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true" 233 if skipDIDWebVerification { 234 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)") 235 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production") 236 } 237 238 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification) 239 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL) 240 241 go func() { 242 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil { 243 log.Printf("Community Jetstream consumer stopped: %v", startErr) 244 } 245 }() 246 247 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL) 248 log.Println(" - Indexing: social.coves.community.profile (community profiles)") 249 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)") 250 251 // Start JWKS cache cleanup background job 252 go func() { 253 ticker := time.NewTicker(1 * time.Hour) 254 defer ticker.Stop() 255 for range ticker.C { 256 jwksFetcher.CleanupExpiredCache() 257 log.Println("JWKS cache cleanup completed") 258 } 259 }() 260 261 log.Println("Started JWKS cache cleanup background job (runs hourly)") 262 263 // Initialize post service 264 postRepo := postgresRepo.NewPostRepository(db) 265 postService := posts.NewPostService(postRepo, communityService, defaultPDS) 266 267 // Initialize feed service 268 feedRepo := postgresRepo.NewCommunityFeedRepository(db) 269 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService) 270 log.Println("✅ Feed service initialized") 271 272 // Start Jetstream consumer for posts 273 // This consumer indexes posts created in community repositories via the firehose 274 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist 275 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL") 276 if postJetstreamURL == "" { 277 // Listen to post record creation events 278 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record" 279 } 280 281 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService) 282 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL) 283 284 go func() { 285 if startErr := postJetstreamConnector.Start(ctx); startErr != nil { 286 log.Printf("Post Jetstream consumer stopped: %v", startErr) 287 } 288 }() 289 290 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL) 291 log.Println(" - Indexing: social.coves.post.record CREATE operations") 292 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented") 293 294 // Register XRPC routes 295 routes.RegisterUserRoutes(r, userService) 296 routes.RegisterCommunityRoutes(r, communityService, authMiddleware) 297 log.Println("Community XRPC endpoints registered with OAuth authentication") 298 299 routes.RegisterPostRoutes(r, postService, authMiddleware) 300 log.Println("Post XRPC endpoints registered with OAuth authentication") 301 302 routes.RegisterCommunityFeedRoutes(r, feedService) 303 log.Println("Feed XRPC endpoints registered (public, no auth required)") 304 305 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 306 w.WriteHeader(http.StatusOK) 307 if _, err := w.Write([]byte("OK")); err != nil { 308 log.Printf("Failed to write health check response: %v", err) 309 } 310 }) 311 312 port := os.Getenv("APPVIEW_PORT") 313 if port == "" { 314 port = "8081" // Match .env.dev default 315 } 316 317 fmt.Printf("Coves AppView starting on port %s\n", port) 318 fmt.Printf("Default PDS: %s\n", defaultPDS) 319 log.Fatal(http.ListenAndServe(":"+port, r)) 320} 321 322// authenticateWithPDS creates a session on the PDS and returns an access token 323func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 324 type CreateSessionRequest struct { 325 Identifier string `json:"identifier"` 326 Password string `json:"password"` 327 } 328 329 type CreateSessionResponse struct { 330 DID string `json:"did"` 331 Handle string `json:"handle"` 332 AccessJwt string `json:"accessJwt"` 333 } 334 335 reqBody, err := json.Marshal(CreateSessionRequest{ 336 Identifier: handle, 337 Password: password, 338 }) 339 if err != nil { 340 return "", fmt.Errorf("failed to marshal request: %w", err) 341 } 342 343 resp, err := http.Post( 344 pdsURL+"/xrpc/com.atproto.server.createSession", 345 "application/json", 346 bytes.NewReader(reqBody), 347 ) 348 if err != nil { 349 return "", fmt.Errorf("failed to call PDS: %w", err) 350 } 351 defer func() { 352 if closeErr := resp.Body.Close(); closeErr != nil { 353 log.Printf("Failed to close response body: %v", closeErr) 354 } 355 }() 356 357 if resp.StatusCode != http.StatusOK { 358 body, readErr := io.ReadAll(resp.Body) 359 if readErr != nil { 360 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr) 361 } 362 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 363 } 364 365 var session CreateSessionResponse 366 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 367 return "", fmt.Errorf("failed to decode response: %w", err) 368 } 369 370 return session.AccessJwt, nil 371}