A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/communities"
10 "Coves/internal/core/users"
11 "bytes"
12 "context"
13 "database/sql"
14 "encoding/json"
15 "fmt"
16 "io"
17 "log"
18 "net/http"
19 "os"
20 "strings"
21 "time"
22
23 "github.com/go-chi/chi/v5"
24 chiMiddleware "github.com/go-chi/chi/v5/middleware"
25 _ "github.com/lib/pq"
26 "github.com/pressly/goose/v3"
27
28 postgresRepo "Coves/internal/db/postgres"
29)
30
31func main() {
32 // Database configuration (AppView database)
33 dbURL := os.Getenv("DATABASE_URL")
34 if dbURL == "" {
35 // Use dev database from .env.dev
36 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
37 }
38
39 // Default PDS URL for this Coves instance (supports self-hosting)
40 defaultPDS := os.Getenv("PDS_URL")
41 if defaultPDS == "" {
42 defaultPDS = "http://localhost:3001" // Local dev PDS
43 }
44
45 db, err := sql.Open("postgres", dbURL)
46 if err != nil {
47 log.Fatal("Failed to connect to database:", err)
48 }
49 defer func() {
50 if closeErr := db.Close(); closeErr != nil {
51 log.Printf("Failed to close database connection: %v", closeErr)
52 }
53 }()
54
55 if err = db.Ping(); err != nil {
56 log.Fatal("Failed to ping database:", err)
57 }
58
59 log.Println("Connected to AppView database")
60
61 // Run migrations
62 if err = goose.SetDialect("postgres"); err != nil {
63 log.Fatal("Failed to set goose dialect:", err)
64 }
65
66 if err = goose.Up(db, "internal/db/migrations"); err != nil {
67 log.Fatal("Failed to run migrations:", err)
68 }
69
70 log.Println("Migrations completed successfully")
71
72 r := chi.NewRouter()
73
74 r.Use(chiMiddleware.Logger)
75 r.Use(chiMiddleware.Recoverer)
76 r.Use(chiMiddleware.RequestID)
77
78 // Rate limiting: 100 requests per minute per IP
79 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
80 r.Use(rateLimiter.Middleware)
81
82 // Initialize identity resolver
83 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
84 // directory as DID registration to ensure E2E tests work without hitting
85 // the production plc.directory
86 identityConfig := identity.DefaultConfig()
87
88 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
89 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
90 if plcDirectoryURL == "" {
91 plcDirectoryURL = "https://plc.directory" // Default to production PLC
92 }
93
94 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
95 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
96 if isDevEnv {
97 identityConfig.PLCURL = plcDirectoryURL
98 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
99 } else {
100 // Production: Allow separate IDENTITY_PLC_URL for read operations
101 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
102 identityConfig.PLCURL = identityPLCURL
103 } else {
104 identityConfig.PLCURL = plcDirectoryURL
105 }
106 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
107 }
108
109 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
110 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
111 identityConfig.CacheTTL = duration
112 }
113 }
114
115 identityResolver := identity.NewResolver(db, identityConfig)
116
117 // Initialize atProto auth middleware for JWT validation
118 // Phase 1: Set skipVerify=true to test JWT parsing only
119 // Phase 2: Set skipVerify=false to enable full signature verification
120 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
121 if skipVerify {
122 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
123 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
124 }
125
126 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
127 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
128 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
129 log.Println("✅ atProto auth middleware initialized")
130
131 // Initialize repositories and services
132 userRepo := postgresRepo.NewUserRepository(db)
133 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
134
135 communityRepo := postgresRepo.NewCommunityRepository(db)
136
137 // V2.0: PDS-managed DID generation
138 // Community DIDs and keys are generated entirely by the PDS
139 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
140
141 instanceDID := os.Getenv("INSTANCE_DID")
142 if instanceDID == "" {
143 instanceDID = "did:web:coves.social" // Default for development
144 }
145
146 // V2: Extract instance domain for community handles
147 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
148 // We cannot allow arbitrary domains to prevent impersonation attacks
149 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
150 //
151 // TODO (Security - V2.1): Implement did:web domain verification
152 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without
153 // actually owning nintendo.com. This allows domain impersonation attacks.
154 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json
155 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web
156 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique).
157 var instanceDomain string
158 if strings.HasPrefix(instanceDID, "did:web:") {
159 // Extract domain from did:web (this is the authoritative source)
160 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
161 } else {
162 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
163 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
164 if instanceDomain == "" {
165 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
166 }
167 }
168
169 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
170
171 // V2.0: Initialize PDS account provisioner for communities (simplified)
172 // PDS handles all DID and key generation - no Coves-side cryptography needed
173 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
174 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
175 log.Printf(" - Communities will be created at: %s", defaultPDS)
176 log.Printf(" - PDS will generate and manage all DIDs and keys")
177
178 // Initialize community service (no longer needs didGenerator directly)
179 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
180
181 // Authenticate Coves instance with PDS to enable community record writes
182 // The instance needs a PDS account to write community records it owns
183 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
184 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
185 if pdsHandle != "" && pdsPassword != "" {
186 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
187 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
188 if authErr != nil {
189 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
190 log.Println("Community creation will fail until PDS authentication is configured")
191 } else {
192 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
193 svc.SetPDSAccessToken(accessToken)
194 log.Println("✓ Coves instance authenticated with PDS")
195 }
196 }
197 } else {
198 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
199 log.Println("Community creation via write-forward is disabled")
200 }
201
202 // Start Jetstream consumer for read-forward user indexing
203 jetstreamURL := os.Getenv("JETSTREAM_URL")
204 if jetstreamURL == "" {
205 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
206 }
207
208 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
209
210 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
211 ctx := context.Background()
212 go func() {
213 if startErr := userConsumer.Start(ctx); startErr != nil {
214 log.Printf("Jetstream consumer stopped: %v", startErr)
215 }
216 }()
217
218 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
219
220 // Start Jetstream consumer for community events (profiles and subscriptions)
221 // This consumer indexes:
222 // 1. Community profiles (social.coves.community.profile) - in community's own repo
223 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
224 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
225 if communityJetstreamURL == "" {
226 // Local Jetstream for communities - filter to our instance's collections
227 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
228 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
229 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
230 }
231
232 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo)
233 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
234
235 go func() {
236 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
237 log.Printf("Community Jetstream consumer stopped: %v", startErr)
238 }
239 }()
240
241 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
242 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
243 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
244
245 // Start JWKS cache cleanup background job
246 go func() {
247 ticker := time.NewTicker(1 * time.Hour)
248 defer ticker.Stop()
249 for range ticker.C {
250 jwksFetcher.CleanupExpiredCache()
251 log.Println("JWKS cache cleanup completed")
252 }
253 }()
254
255 log.Println("Started JWKS cache cleanup background job (runs hourly)")
256
257 // Register XRPC routes
258 routes.RegisterUserRoutes(r, userService)
259 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
260 log.Println("Community XRPC endpoints registered with OAuth authentication")
261
262 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
263 w.WriteHeader(http.StatusOK)
264 if _, err := w.Write([]byte("OK")); err != nil {
265 log.Printf("Failed to write health check response: %v", err)
266 }
267 })
268
269 port := os.Getenv("APPVIEW_PORT")
270 if port == "" {
271 port = "8081" // Match .env.dev default
272 }
273
274 fmt.Printf("Coves AppView starting on port %s\n", port)
275 fmt.Printf("Default PDS: %s\n", defaultPDS)
276 log.Fatal(http.ListenAndServe(":"+port, r))
277}
278
279// authenticateWithPDS creates a session on the PDS and returns an access token
280func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
281 type CreateSessionRequest struct {
282 Identifier string `json:"identifier"`
283 Password string `json:"password"`
284 }
285
286 type CreateSessionResponse struct {
287 DID string `json:"did"`
288 Handle string `json:"handle"`
289 AccessJwt string `json:"accessJwt"`
290 }
291
292 reqBody, err := json.Marshal(CreateSessionRequest{
293 Identifier: handle,
294 Password: password,
295 })
296 if err != nil {
297 return "", fmt.Errorf("failed to marshal request: %w", err)
298 }
299
300 resp, err := http.Post(
301 pdsURL+"/xrpc/com.atproto.server.createSession",
302 "application/json",
303 bytes.NewReader(reqBody),
304 )
305 if err != nil {
306 return "", fmt.Errorf("failed to call PDS: %w", err)
307 }
308 defer func() {
309 if closeErr := resp.Body.Close(); closeErr != nil {
310 log.Printf("Failed to close response body: %v", closeErr)
311 }
312 }()
313
314 if resp.StatusCode != http.StatusOK {
315 body, readErr := io.ReadAll(resp.Body)
316 if readErr != nil {
317 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
318 }
319 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
320 }
321
322 var session CreateSessionResponse
323 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
324 return "", fmt.Errorf("failed to decode response: %w", err)
325 }
326
327 return session.AccessJwt, nil
328}