A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/handlers/oauth"
5 "Coves/internal/api/middleware"
6 "Coves/internal/api/routes"
7 "Coves/internal/atproto/did"
8 "Coves/internal/atproto/identity"
9 "Coves/internal/atproto/jetstream"
10 "Coves/internal/core/communities"
11 "Coves/internal/core/users"
12 "bytes"
13 "context"
14 "database/sql"
15 "encoding/json"
16 "fmt"
17 "io"
18 "log"
19 "net/http"
20 "os"
21 "strings"
22 "time"
23
24 "github.com/go-chi/chi/v5"
25 chiMiddleware "github.com/go-chi/chi/v5/middleware"
26 _ "github.com/lib/pq"
27 "github.com/pressly/goose/v3"
28
29 oauthCore "Coves/internal/core/oauth"
30
31 postgresRepo "Coves/internal/db/postgres"
32)
33
34func main() {
35 // Database configuration (AppView database)
36 dbURL := os.Getenv("DATABASE_URL")
37 if dbURL == "" {
38 // Use dev database from .env.dev
39 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
40 }
41
42 // Default PDS URL for this Coves instance (supports self-hosting)
43 defaultPDS := os.Getenv("PDS_URL")
44 if defaultPDS == "" {
45 defaultPDS = "http://localhost:3001" // Local dev PDS
46 }
47
48 db, err := sql.Open("postgres", dbURL)
49 if err != nil {
50 log.Fatal("Failed to connect to database:", err)
51 }
52 defer func() {
53 if closeErr := db.Close(); closeErr != nil {
54 log.Printf("Failed to close database connection: %v", closeErr)
55 }
56 }()
57
58 if err = db.Ping(); err != nil {
59 log.Fatal("Failed to ping database:", err)
60 }
61
62 log.Println("Connected to AppView database")
63
64 // Run migrations
65 if err = goose.SetDialect("postgres"); err != nil {
66 log.Fatal("Failed to set goose dialect:", err)
67 }
68
69 if err = goose.Up(db, "internal/db/migrations"); err != nil {
70 log.Fatal("Failed to run migrations:", err)
71 }
72
73 log.Println("Migrations completed successfully")
74
75 r := chi.NewRouter()
76
77 r.Use(chiMiddleware.Logger)
78 r.Use(chiMiddleware.Recoverer)
79 r.Use(chiMiddleware.RequestID)
80
81 // Rate limiting: 100 requests per minute per IP
82 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
83 r.Use(rateLimiter.Middleware)
84
85 // Initialize identity resolver
86 identityConfig := identity.DefaultConfig()
87 // Override from environment if set
88 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" {
89 identityConfig.PLCURL = plcURL
90 }
91 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
92 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
93 identityConfig.CacheTTL = duration
94 }
95 }
96
97 identityResolver := identity.NewResolver(db, identityConfig)
98 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL)
99
100 // Initialize OAuth session store
101 sessionStore := oauthCore.NewPostgresSessionStore(db)
102 log.Println("OAuth session store initialized")
103
104 // Initialize repositories and services
105 userRepo := postgresRepo.NewUserRepository(db)
106 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
107
108 communityRepo := postgresRepo.NewCommunityRepository(db)
109
110 // Initialize DID generator for communities
111 // IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory
112 // IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL
113 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
114 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
115 if plcDirectoryURL == "" {
116 plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC
117 }
118 didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL)
119 log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL)
120
121 instanceDID := os.Getenv("INSTANCE_DID")
122 if instanceDID == "" {
123 instanceDID = "did:web:coves.social" // Default for development
124 }
125
126 // V2: Extract instance domain for community handles
127 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
128 // We cannot allow arbitrary domains to prevent impersonation attacks
129 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
130 //
131 // TODO (Security - V2.1): Implement did:web domain verification
132 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without
133 // actually owning nintendo.com. This allows domain impersonation attacks.
134 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json
135 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web
136 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique).
137 var instanceDomain string
138 if strings.HasPrefix(instanceDID, "did:web:") {
139 // Extract domain from did:web (this is the authoritative source)
140 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
141 } else {
142 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
143 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
144 if instanceDomain == "" {
145 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
146 }
147 }
148
149 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
150
151 // V2: Initialize PDS account provisioner for communities
152 provisioner := communities.NewPDSAccountProvisioner(userService, instanceDomain, defaultPDS)
153
154 communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID, instanceDomain, provisioner)
155
156 // Authenticate Coves instance with PDS to enable community record writes
157 // The instance needs a PDS account to write community records it owns
158 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
159 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
160 if pdsHandle != "" && pdsPassword != "" {
161 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
162 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
163 if authErr != nil {
164 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
165 log.Println("Community creation will fail until PDS authentication is configured")
166 } else {
167 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
168 svc.SetPDSAccessToken(accessToken)
169 log.Println("✓ Coves instance authenticated with PDS")
170 }
171 }
172 } else {
173 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
174 log.Println("Community creation via write-forward is disabled")
175 }
176
177 // Start Jetstream consumer for read-forward user indexing
178 jetstreamURL := os.Getenv("JETSTREAM_URL")
179 if jetstreamURL == "" {
180 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
181 }
182
183 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
184
185 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
186 ctx := context.Background()
187 go func() {
188 if startErr := userConsumer.Start(ctx); startErr != nil {
189 log.Printf("Jetstream consumer stopped: %v", startErr)
190 }
191 }()
192
193 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
194
195 // Note: Community indexing happens through the same Jetstream firehose
196 // The CommunityEventConsumer is used by handlers when processing community-related events
197 // For now, community records are created via write-forward to PDS, then indexed when
198 // they appear in the firehose. A dedicated consumer can be added later if needed.
199 log.Println("Community event consumer initialized (processes events from firehose)")
200
201 // Start OAuth cleanup background job
202 go func() {
203 ticker := time.NewTicker(1 * time.Hour)
204 defer ticker.Stop()
205 for range ticker.C {
206 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok {
207 if cleanupErr := pgStore.CleanupExpiredRequests(ctx); cleanupErr != nil {
208 log.Printf("Failed to cleanup expired OAuth requests: %v", cleanupErr)
209 }
210 if cleanupErr := pgStore.CleanupExpiredSessions(ctx); cleanupErr != nil {
211 log.Printf("Failed to cleanup expired OAuth sessions: %v", cleanupErr)
212 }
213 log.Println("OAuth cleanup completed")
214 }
215 }
216 }()
217
218 log.Println("Started OAuth cleanup background job (runs hourly)")
219
220 // Initialize OAuth cookie store (singleton)
221 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET")
222 if err != nil {
223 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err)
224 }
225 if cookieSecret == "" {
226 log.Fatal("OAUTH_COOKIE_SECRET not configured")
227 }
228
229 if err := oauth.InitCookieStore(cookieSecret); err != nil {
230 log.Fatalf("Failed to initialize cookie store: %v", err)
231 }
232
233 // Initialize OAuth handlers
234 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore)
235 callbackHandler := oauth.NewCallbackHandler(sessionStore)
236 logoutHandler := oauth.NewLogoutHandler(sessionStore)
237
238 // OAuth routes (public endpoints)
239 r.Post("/oauth/login", loginHandler.HandleLogin)
240 r.Get("/oauth/callback", callbackHandler.HandleCallback)
241 r.Post("/oauth/logout", logoutHandler.HandleLogout)
242 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata)
243 r.Get("/oauth/jwks.json", oauth.HandleJWKS)
244
245 log.Println("OAuth endpoints registered")
246
247 // Register XRPC routes
248 routes.RegisterUserRoutes(r, userService)
249 routes.RegisterCommunityRoutes(r, communityService)
250 log.Println("Community XRPC endpoints registered")
251
252 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
253 w.WriteHeader(http.StatusOK)
254 if _, err := w.Write([]byte("OK")); err != nil {
255 log.Printf("Failed to write health check response: %v", err)
256 }
257 })
258
259 port := os.Getenv("APPVIEW_PORT")
260 if port == "" {
261 port = "8081" // Match .env.dev default
262 }
263
264 fmt.Printf("Coves AppView starting on port %s\n", port)
265 fmt.Printf("Default PDS: %s\n", defaultPDS)
266 log.Fatal(http.ListenAndServe(":"+port, r))
267}
268
269// authenticateWithPDS creates a session on the PDS and returns an access token
270func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
271 type CreateSessionRequest struct {
272 Identifier string `json:"identifier"`
273 Password string `json:"password"`
274 }
275
276 type CreateSessionResponse struct {
277 DID string `json:"did"`
278 Handle string `json:"handle"`
279 AccessJwt string `json:"accessJwt"`
280 }
281
282 reqBody, err := json.Marshal(CreateSessionRequest{
283 Identifier: handle,
284 Password: password,
285 })
286 if err != nil {
287 return "", fmt.Errorf("failed to marshal request: %w", err)
288 }
289
290 resp, err := http.Post(
291 pdsURL+"/xrpc/com.atproto.server.createSession",
292 "application/json",
293 bytes.NewReader(reqBody),
294 )
295 if err != nil {
296 return "", fmt.Errorf("failed to call PDS: %w", err)
297 }
298 defer func() {
299 if closeErr := resp.Body.Close(); closeErr != nil {
300 log.Printf("Failed to close response body: %v", closeErr)
301 }
302 }()
303
304 if resp.StatusCode != http.StatusOK {
305 body, readErr := io.ReadAll(resp.Body)
306 if readErr != nil {
307 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
308 }
309 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
310 }
311
312 var session CreateSessionResponse
313 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
314 return "", fmt.Errorf("failed to decode response: %w", err)
315 }
316
317 return session.AccessJwt, nil
318}