A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/communities"
10 "Coves/internal/core/posts"
11 "Coves/internal/core/users"
12 "bytes"
13 "context"
14 "database/sql"
15 "encoding/json"
16 "fmt"
17 "io"
18 "log"
19 "net/http"
20 "os"
21 "strings"
22 "time"
23
24 "github.com/go-chi/chi/v5"
25 chiMiddleware "github.com/go-chi/chi/v5/middleware"
26 _ "github.com/lib/pq"
27 "github.com/pressly/goose/v3"
28
29 postgresRepo "Coves/internal/db/postgres"
30)
31
32func main() {
33 // Database configuration (AppView database)
34 dbURL := os.Getenv("DATABASE_URL")
35 if dbURL == "" {
36 // Use dev database from .env.dev
37 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
38 }
39
40 // Default PDS URL for this Coves instance (supports self-hosting)
41 defaultPDS := os.Getenv("PDS_URL")
42 if defaultPDS == "" {
43 defaultPDS = "http://localhost:3001" // Local dev PDS
44 }
45
46 db, err := sql.Open("postgres", dbURL)
47 if err != nil {
48 log.Fatal("Failed to connect to database:", err)
49 }
50 defer func() {
51 if closeErr := db.Close(); closeErr != nil {
52 log.Printf("Failed to close database connection: %v", closeErr)
53 }
54 }()
55
56 if err = db.Ping(); err != nil {
57 log.Fatal("Failed to ping database:", err)
58 }
59
60 log.Println("Connected to AppView database")
61
62 // Run migrations
63 if err = goose.SetDialect("postgres"); err != nil {
64 log.Fatal("Failed to set goose dialect:", err)
65 }
66
67 if err = goose.Up(db, "internal/db/migrations"); err != nil {
68 log.Fatal("Failed to run migrations:", err)
69 }
70
71 log.Println("Migrations completed successfully")
72
73 r := chi.NewRouter()
74
75 r.Use(chiMiddleware.Logger)
76 r.Use(chiMiddleware.Recoverer)
77 r.Use(chiMiddleware.RequestID)
78
79 // Rate limiting: 100 requests per minute per IP
80 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
81 r.Use(rateLimiter.Middleware)
82
83 // Initialize identity resolver
84 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
85 // directory as DID registration to ensure E2E tests work without hitting
86 // the production plc.directory
87 identityConfig := identity.DefaultConfig()
88
89 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
90 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
91 if plcDirectoryURL == "" {
92 plcDirectoryURL = "https://plc.directory" // Default to production PLC
93 }
94
95 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
96 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
97 if isDevEnv {
98 identityConfig.PLCURL = plcDirectoryURL
99 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
100 } else {
101 // Production: Allow separate IDENTITY_PLC_URL for read operations
102 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
103 identityConfig.PLCURL = identityPLCURL
104 } else {
105 identityConfig.PLCURL = plcDirectoryURL
106 }
107 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
108 }
109
110 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
111 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
112 identityConfig.CacheTTL = duration
113 }
114 }
115
116 identityResolver := identity.NewResolver(db, identityConfig)
117
118 // Initialize atProto auth middleware for JWT validation
119 // Phase 1: Set skipVerify=true to test JWT parsing only
120 // Phase 2: Set skipVerify=false to enable full signature verification
121 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
122 if skipVerify {
123 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
124 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
125 }
126
127 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
128 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
129 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
130 log.Println("✅ atProto auth middleware initialized")
131
132 // Initialize repositories and services
133 userRepo := postgresRepo.NewUserRepository(db)
134 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
135
136 communityRepo := postgresRepo.NewCommunityRepository(db)
137
138 // V2.0: PDS-managed DID generation
139 // Community DIDs and keys are generated entirely by the PDS
140 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
141
142 instanceDID := os.Getenv("INSTANCE_DID")
143 if instanceDID == "" {
144 instanceDID = "did:web:coves.social" // Default for development
145 }
146
147 // V2: Extract instance domain for community handles
148 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
149 // We cannot allow arbitrary domains to prevent impersonation attacks
150 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
151 //
152 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
153 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
154 // Communities with mismatched hostedBy domains are rejected during indexing
155 var instanceDomain string
156 if strings.HasPrefix(instanceDID, "did:web:") {
157 // Extract domain from did:web (this is the authoritative source)
158 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
159 } else {
160 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
161 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
162 if instanceDomain == "" {
163 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
164 }
165 }
166
167 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
168
169 // V2.0: Initialize PDS account provisioner for communities (simplified)
170 // PDS handles all DID and key generation - no Coves-side cryptography needed
171 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
172 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
173 log.Printf(" - Communities will be created at: %s", defaultPDS)
174 log.Printf(" - PDS will generate and manage all DIDs and keys")
175
176 // Initialize community service (no longer needs didGenerator directly)
177 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
178
179 // Authenticate Coves instance with PDS to enable community record writes
180 // The instance needs a PDS account to write community records it owns
181 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
182 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
183 if pdsHandle != "" && pdsPassword != "" {
184 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
185 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
186 if authErr != nil {
187 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
188 log.Println("Community creation will fail until PDS authentication is configured")
189 } else {
190 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
191 svc.SetPDSAccessToken(accessToken)
192 log.Println("✓ Coves instance authenticated with PDS")
193 }
194 }
195 } else {
196 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
197 log.Println("Community creation via write-forward is disabled")
198 }
199
200 // Start Jetstream consumer for read-forward user indexing
201 jetstreamURL := os.Getenv("JETSTREAM_URL")
202 if jetstreamURL == "" {
203 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
204 }
205
206 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
207
208 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
209 ctx := context.Background()
210 go func() {
211 if startErr := userConsumer.Start(ctx); startErr != nil {
212 log.Printf("Jetstream consumer stopped: %v", startErr)
213 }
214 }()
215
216 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
217
218 // Start Jetstream consumer for community events (profiles and subscriptions)
219 // This consumer indexes:
220 // 1. Community profiles (social.coves.community.profile) - in community's own repo
221 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
222 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
223 if communityJetstreamURL == "" {
224 // Local Jetstream for communities - filter to our instance's collections
225 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
226 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
227 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
228 }
229
230 // Initialize community event consumer with did:web verification
231 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
232 if skipDIDWebVerification {
233 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
234 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
235 }
236
237 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification)
238 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
239
240 go func() {
241 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
242 log.Printf("Community Jetstream consumer stopped: %v", startErr)
243 }
244 }()
245
246 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
247 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
248 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
249
250 // Start JWKS cache cleanup background job
251 go func() {
252 ticker := time.NewTicker(1 * time.Hour)
253 defer ticker.Stop()
254 for range ticker.C {
255 jwksFetcher.CleanupExpiredCache()
256 log.Println("JWKS cache cleanup completed")
257 }
258 }()
259
260 log.Println("Started JWKS cache cleanup background job (runs hourly)")
261
262 // Initialize post service
263 postRepo := postgresRepo.NewPostRepository(db)
264 postService := posts.NewPostService(postRepo, communityService, defaultPDS)
265
266 // Start Jetstream consumer for posts
267 // This consumer indexes posts created in community repositories via the firehose
268 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
269 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
270 if postJetstreamURL == "" {
271 // Listen to post record creation events
272 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record"
273 }
274
275 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
276 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
277
278 go func() {
279 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
280 log.Printf("Post Jetstream consumer stopped: %v", startErr)
281 }
282 }()
283
284 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
285 log.Println(" - Indexing: social.coves.post.record CREATE operations")
286 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
287
288 // Register XRPC routes
289 routes.RegisterUserRoutes(r, userService)
290 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
291 log.Println("Community XRPC endpoints registered with OAuth authentication")
292
293 routes.RegisterPostRoutes(r, postService, authMiddleware)
294 log.Println("Post XRPC endpoints registered with OAuth authentication")
295
296 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
297 w.WriteHeader(http.StatusOK)
298 if _, err := w.Write([]byte("OK")); err != nil {
299 log.Printf("Failed to write health check response: %v", err)
300 }
301 })
302
303 port := os.Getenv("APPVIEW_PORT")
304 if port == "" {
305 port = "8081" // Match .env.dev default
306 }
307
308 fmt.Printf("Coves AppView starting on port %s\n", port)
309 fmt.Printf("Default PDS: %s\n", defaultPDS)
310 log.Fatal(http.ListenAndServe(":"+port, r))
311}
312
313// authenticateWithPDS creates a session on the PDS and returns an access token
314func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
315 type CreateSessionRequest struct {
316 Identifier string `json:"identifier"`
317 Password string `json:"password"`
318 }
319
320 type CreateSessionResponse struct {
321 DID string `json:"did"`
322 Handle string `json:"handle"`
323 AccessJwt string `json:"accessJwt"`
324 }
325
326 reqBody, err := json.Marshal(CreateSessionRequest{
327 Identifier: handle,
328 Password: password,
329 })
330 if err != nil {
331 return "", fmt.Errorf("failed to marshal request: %w", err)
332 }
333
334 resp, err := http.Post(
335 pdsURL+"/xrpc/com.atproto.server.createSession",
336 "application/json",
337 bytes.NewReader(reqBody),
338 )
339 if err != nil {
340 return "", fmt.Errorf("failed to call PDS: %w", err)
341 }
342 defer func() {
343 if closeErr := resp.Body.Close(); closeErr != nil {
344 log.Printf("Failed to close response body: %v", closeErr)
345 }
346 }()
347
348 if resp.StatusCode != http.StatusOK {
349 body, readErr := io.ReadAll(resp.Body)
350 if readErr != nil {
351 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
352 }
353 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
354 }
355
356 var session CreateSessionResponse
357 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
358 return "", fmt.Errorf("failed to decode response: %w", err)
359 }
360
361 return session.AccessJwt, nil
362}