A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log" 11 "net/http" 12 "os" 13 "strings" 14 "time" 15 16 "github.com/go-chi/chi/v5" 17 chiMiddleware "github.com/go-chi/chi/v5/middleware" 18 _ "github.com/lib/pq" 19 "github.com/pressly/goose/v3" 20 21 "Coves/internal/api/handlers/oauth" 22 "Coves/internal/api/middleware" 23 "Coves/internal/api/routes" 24 "Coves/internal/atproto/did" 25 "Coves/internal/atproto/identity" 26 "Coves/internal/atproto/jetstream" 27 "Coves/internal/core/communities" 28 oauthCore "Coves/internal/core/oauth" 29 "Coves/internal/core/users" 30 postgresRepo "Coves/internal/db/postgres" 31) 32 33func main() { 34 // Database configuration (AppView database) 35 dbURL := os.Getenv("DATABASE_URL") 36 if dbURL == "" { 37 // Use dev database from .env.dev 38 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable" 39 } 40 41 // Default PDS URL for this Coves instance (supports self-hosting) 42 defaultPDS := os.Getenv("PDS_URL") 43 if defaultPDS == "" { 44 defaultPDS = "http://localhost:3001" // Local dev PDS 45 } 46 47 db, err := sql.Open("postgres", dbURL) 48 if err != nil { 49 log.Fatal("Failed to connect to database:", err) 50 } 51 defer db.Close() 52 53 if err := db.Ping(); err != nil { 54 log.Fatal("Failed to ping database:", err) 55 } 56 57 log.Println("Connected to AppView database") 58 59 // Run migrations 60 if err := goose.SetDialect("postgres"); err != nil { 61 log.Fatal("Failed to set goose dialect:", err) 62 } 63 64 if err := goose.Up(db, "internal/db/migrations"); err != nil { 65 log.Fatal("Failed to run migrations:", err) 66 } 67 68 log.Println("Migrations completed successfully") 69 70 r := chi.NewRouter() 71 72 r.Use(chiMiddleware.Logger) 73 r.Use(chiMiddleware.Recoverer) 74 r.Use(chiMiddleware.RequestID) 75 76 // Rate limiting: 100 requests per minute per IP 77 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 78 r.Use(rateLimiter.Middleware) 79 80 // Initialize identity resolver 81 identityConfig := identity.DefaultConfig() 82 // Override from environment if set 83 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" { 84 identityConfig.PLCURL = plcURL 85 } 86 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 87 if duration, err := time.ParseDuration(cacheTTL); err == nil { 88 identityConfig.CacheTTL = duration 89 } 90 } 91 92 identityResolver := identity.NewResolver(db, identityConfig) 93 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL) 94 95 // Initialize OAuth session store 96 sessionStore := oauthCore.NewPostgresSessionStore(db) 97 log.Println("OAuth session store initialized") 98 99 // Initialize repositories and services 100 userRepo := postgresRepo.NewUserRepository(db) 101 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 102 103 communityRepo := postgresRepo.NewCommunityRepository(db) 104 105 // Initialize DID generator for communities 106 // IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory 107 // IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL 108 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 109 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 110 if plcDirectoryURL == "" { 111 plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC 112 } 113 didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL) 114 log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL) 115 116 instanceDID := os.Getenv("INSTANCE_DID") 117 if instanceDID == "" { 118 instanceDID = "did:web:coves.social" // Default for development 119 } 120 121 // V2: Extract instance domain for community handles 122 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security 123 // We cannot allow arbitrary domains to prevent impersonation attacks 124 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance 125 // 126 // TODO (Security - V2.1): Implement did:web domain verification 127 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without 128 // actually owning nintendo.com. This allows domain impersonation attacks. 129 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json 130 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web 131 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique). 132 var instanceDomain string 133 if strings.HasPrefix(instanceDID, "did:web:") { 134 // Extract domain from did:web (this is the authoritative source) 135 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:") 136 } else { 137 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN 138 instanceDomain = os.Getenv("INSTANCE_DOMAIN") 139 if instanceDomain == "" { 140 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs") 141 } 142 } 143 144 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID) 145 146 // V2: Initialize PDS account provisioner for communities 147 provisioner := communities.NewPDSAccountProvisioner(userService, instanceDomain, defaultPDS) 148 149 communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID, instanceDomain, provisioner) 150 151 // Authenticate Coves instance with PDS to enable community record writes 152 // The instance needs a PDS account to write community records it owns 153 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 154 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 155 if pdsHandle != "" && pdsPassword != "" { 156 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 157 accessToken, err := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 158 if err != nil { 159 log.Printf("Warning: Failed to authenticate with PDS: %v", err) 160 log.Println("Community creation will fail until PDS authentication is configured") 161 } else { 162 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 163 svc.SetPDSAccessToken(accessToken) 164 log.Println("✓ Coves instance authenticated with PDS") 165 } 166 } 167 } else { 168 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 169 log.Println("Community creation via write-forward is disabled") 170 } 171 172 // Start Jetstream consumer for read-forward user indexing 173 jetstreamURL := os.Getenv("JETSTREAM_URL") 174 if jetstreamURL == "" { 175 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 176 } 177 178 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 179 180 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 181 ctx := context.Background() 182 go func() { 183 if err := userConsumer.Start(ctx); err != nil { 184 log.Printf("Jetstream consumer stopped: %v", err) 185 } 186 }() 187 188 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 189 190 // Note: Community indexing happens through the same Jetstream firehose 191 // The CommunityEventConsumer is used by handlers when processing community-related events 192 // For now, community records are created via write-forward to PDS, then indexed when 193 // they appear in the firehose. A dedicated consumer can be added later if needed. 194 log.Println("Community event consumer initialized (processes events from firehose)") 195 196 // Start OAuth cleanup background job 197 go func() { 198 ticker := time.NewTicker(1 * time.Hour) 199 defer ticker.Stop() 200 for range ticker.C { 201 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok { 202 _ = pgStore.CleanupExpiredRequests(ctx) 203 _ = pgStore.CleanupExpiredSessions(ctx) 204 log.Println("OAuth cleanup completed") 205 } 206 } 207 }() 208 209 log.Println("Started OAuth cleanup background job (runs hourly)") 210 211 // Initialize OAuth cookie store (singleton) 212 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET") 213 if err != nil { 214 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err) 215 } 216 if cookieSecret == "" { 217 log.Fatal("OAUTH_COOKIE_SECRET not configured") 218 } 219 220 if err := oauth.InitCookieStore(cookieSecret); err != nil { 221 log.Fatalf("Failed to initialize cookie store: %v", err) 222 } 223 224 // Initialize OAuth handlers 225 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore) 226 callbackHandler := oauth.NewCallbackHandler(sessionStore) 227 logoutHandler := oauth.NewLogoutHandler(sessionStore) 228 229 // OAuth routes (public endpoints) 230 r.Post("/oauth/login", loginHandler.HandleLogin) 231 r.Get("/oauth/callback", callbackHandler.HandleCallback) 232 r.Post("/oauth/logout", logoutHandler.HandleLogout) 233 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata) 234 r.Get("/oauth/jwks.json", oauth.HandleJWKS) 235 236 log.Println("OAuth endpoints registered") 237 238 // Register XRPC routes 239 routes.RegisterUserRoutes(r, userService) 240 routes.RegisterCommunityRoutes(r, communityService) 241 log.Println("Community XRPC endpoints registered") 242 243 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 244 w.WriteHeader(http.StatusOK) 245 w.Write([]byte("OK")) 246 }) 247 248 port := os.Getenv("APPVIEW_PORT") 249 if port == "" { 250 port = "8081" // Match .env.dev default 251 } 252 253 fmt.Printf("Coves AppView starting on port %s\n", port) 254 fmt.Printf("Default PDS: %s\n", defaultPDS) 255 log.Fatal(http.ListenAndServe(":"+port, r)) 256} 257 258// authenticateWithPDS creates a session on the PDS and returns an access token 259func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 260 type CreateSessionRequest struct { 261 Identifier string `json:"identifier"` 262 Password string `json:"password"` 263 } 264 265 type CreateSessionResponse struct { 266 DID string `json:"did"` 267 Handle string `json:"handle"` 268 AccessJwt string `json:"accessJwt"` 269 } 270 271 reqBody, err := json.Marshal(CreateSessionRequest{ 272 Identifier: handle, 273 Password: password, 274 }) 275 if err != nil { 276 return "", fmt.Errorf("failed to marshal request: %w", err) 277 } 278 279 resp, err := http.Post( 280 pdsURL+"/xrpc/com.atproto.server.createSession", 281 "application/json", 282 bytes.NewReader(reqBody), 283 ) 284 if err != nil { 285 return "", fmt.Errorf("failed to call PDS: %w", err) 286 } 287 defer resp.Body.Close() 288 289 if resp.StatusCode != http.StatusOK { 290 body, _ := io.ReadAll(resp.Body) 291 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 292 } 293 294 var session CreateSessionResponse 295 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 296 return "", fmt.Errorf("failed to decode response: %w", err) 297 } 298 299 return session.AccessJwt, nil 300}