A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/handlers/oauth"
5 "Coves/internal/api/middleware"
6 "Coves/internal/api/routes"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/communities"
10 "Coves/internal/core/users"
11 "bytes"
12 "context"
13 "database/sql"
14 "encoding/json"
15 "fmt"
16 "io"
17 "log"
18 "net/http"
19 "os"
20 "strings"
21 "time"
22
23 "github.com/go-chi/chi/v5"
24 chiMiddleware "github.com/go-chi/chi/v5/middleware"
25 _ "github.com/lib/pq"
26 "github.com/pressly/goose/v3"
27
28 oauthCore "Coves/internal/core/oauth"
29
30 postgresRepo "Coves/internal/db/postgres"
31)
32
33func main() {
34 // Database configuration (AppView database)
35 dbURL := os.Getenv("DATABASE_URL")
36 if dbURL == "" {
37 // Use dev database from .env.dev
38 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
39 }
40
41 // Default PDS URL for this Coves instance (supports self-hosting)
42 defaultPDS := os.Getenv("PDS_URL")
43 if defaultPDS == "" {
44 defaultPDS = "http://localhost:3001" // Local dev PDS
45 }
46
47 db, err := sql.Open("postgres", dbURL)
48 if err != nil {
49 log.Fatal("Failed to connect to database:", err)
50 }
51 defer func() {
52 if closeErr := db.Close(); closeErr != nil {
53 log.Printf("Failed to close database connection: %v", closeErr)
54 }
55 }()
56
57 if err = db.Ping(); err != nil {
58 log.Fatal("Failed to ping database:", err)
59 }
60
61 log.Println("Connected to AppView database")
62
63 // Run migrations
64 if err = goose.SetDialect("postgres"); err != nil {
65 log.Fatal("Failed to set goose dialect:", err)
66 }
67
68 if err = goose.Up(db, "internal/db/migrations"); err != nil {
69 log.Fatal("Failed to run migrations:", err)
70 }
71
72 log.Println("Migrations completed successfully")
73
74 r := chi.NewRouter()
75
76 r.Use(chiMiddleware.Logger)
77 r.Use(chiMiddleware.Recoverer)
78 r.Use(chiMiddleware.RequestID)
79
80 // Rate limiting: 100 requests per minute per IP
81 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
82 r.Use(rateLimiter.Middleware)
83
84 // Initialize identity resolver
85 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
86 // directory as DID registration to ensure E2E tests work without hitting
87 // the production plc.directory
88 identityConfig := identity.DefaultConfig()
89
90 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
91 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
92 if plcDirectoryURL == "" {
93 plcDirectoryURL = "https://plc.directory" // Default to production PLC
94 }
95
96 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
97 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
98 if isDevEnv {
99 identityConfig.PLCURL = plcDirectoryURL
100 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
101 } else {
102 // Production: Allow separate IDENTITY_PLC_URL for read operations
103 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
104 identityConfig.PLCURL = identityPLCURL
105 } else {
106 identityConfig.PLCURL = plcDirectoryURL
107 }
108 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
109 }
110
111 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
112 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
113 identityConfig.CacheTTL = duration
114 }
115 }
116
117 identityResolver := identity.NewResolver(db, identityConfig)
118
119 // Initialize OAuth session store
120 sessionStore := oauthCore.NewPostgresSessionStore(db)
121 log.Println("OAuth session store initialized")
122
123 // Initialize repositories and services
124 userRepo := postgresRepo.NewUserRepository(db)
125 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
126
127 communityRepo := postgresRepo.NewCommunityRepository(db)
128
129 // V2.0: PDS-managed DID generation
130 // Community DIDs and keys are generated entirely by the PDS
131 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
132
133 instanceDID := os.Getenv("INSTANCE_DID")
134 if instanceDID == "" {
135 instanceDID = "did:web:coves.social" // Default for development
136 }
137
138 // V2: Extract instance domain for community handles
139 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
140 // We cannot allow arbitrary domains to prevent impersonation attacks
141 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
142 //
143 // TODO (Security - V2.1): Implement did:web domain verification
144 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without
145 // actually owning nintendo.com. This allows domain impersonation attacks.
146 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json
147 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web
148 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique).
149 var instanceDomain string
150 if strings.HasPrefix(instanceDID, "did:web:") {
151 // Extract domain from did:web (this is the authoritative source)
152 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
153 } else {
154 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
155 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
156 if instanceDomain == "" {
157 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
158 }
159 }
160
161 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
162
163 // V2.0: Initialize PDS account provisioner for communities (simplified)
164 // PDS handles all DID and key generation - no Coves-side cryptography needed
165 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
166 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
167 log.Printf(" - Communities will be created at: %s", defaultPDS)
168 log.Printf(" - PDS will generate and manage all DIDs and keys")
169
170 // Initialize community service (no longer needs didGenerator directly)
171 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
172
173 // Authenticate Coves instance with PDS to enable community record writes
174 // The instance needs a PDS account to write community records it owns
175 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
176 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
177 if pdsHandle != "" && pdsPassword != "" {
178 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
179 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
180 if authErr != nil {
181 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
182 log.Println("Community creation will fail until PDS authentication is configured")
183 } else {
184 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
185 svc.SetPDSAccessToken(accessToken)
186 log.Println("✓ Coves instance authenticated with PDS")
187 }
188 }
189 } else {
190 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
191 log.Println("Community creation via write-forward is disabled")
192 }
193
194 // Start Jetstream consumer for read-forward user indexing
195 jetstreamURL := os.Getenv("JETSTREAM_URL")
196 if jetstreamURL == "" {
197 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
198 }
199
200 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
201
202 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
203 ctx := context.Background()
204 go func() {
205 if startErr := userConsumer.Start(ctx); startErr != nil {
206 log.Printf("Jetstream consumer stopped: %v", startErr)
207 }
208 }()
209
210 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
211
212 // Note: Community indexing happens through the same Jetstream firehose
213 // The CommunityEventConsumer is used by handlers when processing community-related events
214 // For now, community records are created via write-forward to PDS, then indexed when
215 // they appear in the firehose. A dedicated consumer can be added later if needed.
216 log.Println("Community event consumer initialized (processes events from firehose)")
217
218 // Start OAuth cleanup background job
219 go func() {
220 ticker := time.NewTicker(1 * time.Hour)
221 defer ticker.Stop()
222 for range ticker.C {
223 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok {
224 if cleanupErr := pgStore.CleanupExpiredRequests(ctx); cleanupErr != nil {
225 log.Printf("Failed to cleanup expired OAuth requests: %v", cleanupErr)
226 }
227 if cleanupErr := pgStore.CleanupExpiredSessions(ctx); cleanupErr != nil {
228 log.Printf("Failed to cleanup expired OAuth sessions: %v", cleanupErr)
229 }
230 log.Println("OAuth cleanup completed")
231 }
232 }
233 }()
234
235 log.Println("Started OAuth cleanup background job (runs hourly)")
236
237 // Initialize OAuth cookie store (singleton)
238 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET")
239 if err != nil {
240 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err)
241 }
242 if cookieSecret == "" {
243 log.Fatal("OAUTH_COOKIE_SECRET not configured")
244 }
245
246 if err := oauth.InitCookieStore(cookieSecret); err != nil {
247 log.Fatalf("Failed to initialize cookie store: %v", err)
248 }
249
250 // Initialize OAuth handlers
251 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore)
252 callbackHandler := oauth.NewCallbackHandler(sessionStore)
253 logoutHandler := oauth.NewLogoutHandler(sessionStore)
254
255 // OAuth routes (public endpoints)
256 r.Post("/oauth/login", loginHandler.HandleLogin)
257 r.Get("/oauth/callback", callbackHandler.HandleCallback)
258 r.Post("/oauth/logout", logoutHandler.HandleLogout)
259 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata)
260 r.Get("/oauth/jwks.json", oauth.HandleJWKS)
261
262 log.Println("OAuth endpoints registered")
263
264 // Register XRPC routes
265 routes.RegisterUserRoutes(r, userService)
266 routes.RegisterCommunityRoutes(r, communityService)
267 log.Println("Community XRPC endpoints registered")
268
269 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
270 w.WriteHeader(http.StatusOK)
271 if _, err := w.Write([]byte("OK")); err != nil {
272 log.Printf("Failed to write health check response: %v", err)
273 }
274 })
275
276 port := os.Getenv("APPVIEW_PORT")
277 if port == "" {
278 port = "8081" // Match .env.dev default
279 }
280
281 fmt.Printf("Coves AppView starting on port %s\n", port)
282 fmt.Printf("Default PDS: %s\n", defaultPDS)
283 log.Fatal(http.ListenAndServe(":"+port, r))
284}
285
286// authenticateWithPDS creates a session on the PDS and returns an access token
287func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
288 type CreateSessionRequest struct {
289 Identifier string `json:"identifier"`
290 Password string `json:"password"`
291 }
292
293 type CreateSessionResponse struct {
294 DID string `json:"did"`
295 Handle string `json:"handle"`
296 AccessJwt string `json:"accessJwt"`
297 }
298
299 reqBody, err := json.Marshal(CreateSessionRequest{
300 Identifier: handle,
301 Password: password,
302 })
303 if err != nil {
304 return "", fmt.Errorf("failed to marshal request: %w", err)
305 }
306
307 resp, err := http.Post(
308 pdsURL+"/xrpc/com.atproto.server.createSession",
309 "application/json",
310 bytes.NewReader(reqBody),
311 )
312 if err != nil {
313 return "", fmt.Errorf("failed to call PDS: %w", err)
314 }
315 defer func() {
316 if closeErr := resp.Body.Close(); closeErr != nil {
317 log.Printf("Failed to close response body: %v", closeErr)
318 }
319 }()
320
321 if resp.StatusCode != http.StatusOK {
322 body, readErr := io.ReadAll(resp.Body)
323 if readErr != nil {
324 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
325 }
326 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
327 }
328
329 var session CreateSessionResponse
330 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
331 return "", fmt.Errorf("failed to decode response: %w", err)
332 }
333
334 return session.AccessJwt, nil
335}