A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/communities"
11 "Coves/internal/core/communityFeeds"
12 "Coves/internal/core/discover"
13 "Coves/internal/core/posts"
14 "Coves/internal/core/timeline"
15 "Coves/internal/core/users"
16 "bytes"
17 "context"
18 "database/sql"
19 "encoding/json"
20 "fmt"
21 "io"
22 "log"
23 "net/http"
24 "os"
25 "strings"
26 "time"
27
28 "github.com/go-chi/chi/v5"
29 chiMiddleware "github.com/go-chi/chi/v5/middleware"
30 _ "github.com/lib/pq"
31 "github.com/pressly/goose/v3"
32
33 postgresRepo "Coves/internal/db/postgres"
34)
35
36func main() {
37 // Database configuration (AppView database)
38 dbURL := os.Getenv("DATABASE_URL")
39 if dbURL == "" {
40 // Use dev database from .env.dev
41 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
42 }
43
44 // Default PDS URL for this Coves instance (supports self-hosting)
45 defaultPDS := os.Getenv("PDS_URL")
46 if defaultPDS == "" {
47 defaultPDS = "http://localhost:3001" // Local dev PDS
48 }
49
50 // Cursor secret for HMAC signing (prevents cursor manipulation)
51 cursorSecret := os.Getenv("CURSOR_SECRET")
52 if cursorSecret == "" {
53 // Generate a random secret if not set (dev mode)
54 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
55 cursorSecret = "dev-cursor-secret-change-in-production"
56 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
57 }
58
59 db, err := sql.Open("postgres", dbURL)
60 if err != nil {
61 log.Fatal("Failed to connect to database:", err)
62 }
63 defer func() {
64 if closeErr := db.Close(); closeErr != nil {
65 log.Printf("Failed to close database connection: %v", closeErr)
66 }
67 }()
68
69 if err = db.Ping(); err != nil {
70 log.Fatal("Failed to ping database:", err)
71 }
72
73 log.Println("Connected to AppView database")
74
75 // Run migrations
76 if err = goose.SetDialect("postgres"); err != nil {
77 log.Fatal("Failed to set goose dialect:", err)
78 }
79
80 if err = goose.Up(db, "internal/db/migrations"); err != nil {
81 log.Fatal("Failed to run migrations:", err)
82 }
83
84 log.Println("Migrations completed successfully")
85
86 r := chi.NewRouter()
87
88 r.Use(chiMiddleware.Logger)
89 r.Use(chiMiddleware.Recoverer)
90 r.Use(chiMiddleware.RequestID)
91
92 // Rate limiting: 100 requests per minute per IP
93 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
94 r.Use(rateLimiter.Middleware)
95
96 // Initialize identity resolver
97 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
98 // directory as DID registration to ensure E2E tests work without hitting
99 // the production plc.directory
100 identityConfig := identity.DefaultConfig()
101
102 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
103 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
104 if plcDirectoryURL == "" {
105 plcDirectoryURL = "https://plc.directory" // Default to production PLC
106 }
107
108 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
109 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
110 if isDevEnv {
111 identityConfig.PLCURL = plcDirectoryURL
112 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
113 } else {
114 // Production: Allow separate IDENTITY_PLC_URL for read operations
115 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
116 identityConfig.PLCURL = identityPLCURL
117 } else {
118 identityConfig.PLCURL = plcDirectoryURL
119 }
120 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
121 }
122
123 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
124 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
125 identityConfig.CacheTTL = duration
126 }
127 }
128
129 identityResolver := identity.NewResolver(db, identityConfig)
130
131 // Initialize atProto auth middleware for JWT validation
132 // Phase 1: Set skipVerify=true to test JWT parsing only
133 // Phase 2: Set skipVerify=false to enable full signature verification
134 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
135 if skipVerify {
136 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
137 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
138 }
139
140 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
141 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
142 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
143 log.Println("✅ atProto auth middleware initialized")
144
145 // Initialize repositories and services
146 userRepo := postgresRepo.NewUserRepository(db)
147 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
148
149 communityRepo := postgresRepo.NewCommunityRepository(db)
150
151 // V2.0: PDS-managed DID generation
152 // Community DIDs and keys are generated entirely by the PDS
153 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
154
155 instanceDID := os.Getenv("INSTANCE_DID")
156 if instanceDID == "" {
157 instanceDID = "did:web:coves.social" // Default for development
158 }
159
160 // V2: Extract instance domain for community handles
161 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
162 // We cannot allow arbitrary domains to prevent impersonation attacks
163 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
164 //
165 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
166 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
167 // Communities with mismatched hostedBy domains are rejected during indexing
168 var instanceDomain string
169 if strings.HasPrefix(instanceDID, "did:web:") {
170 // Extract domain from did:web (this is the authoritative source)
171 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
172 } else {
173 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
174 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
175 if instanceDomain == "" {
176 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
177 }
178 }
179
180 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
181
182 // V2.0: Initialize PDS account provisioner for communities (simplified)
183 // PDS handles all DID and key generation - no Coves-side cryptography needed
184 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
185 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
186 log.Printf(" - Communities will be created at: %s", defaultPDS)
187 log.Printf(" - PDS will generate and manage all DIDs and keys")
188
189 // Initialize community service (no longer needs didGenerator directly)
190 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
191
192 // Authenticate Coves instance with PDS to enable community record writes
193 // The instance needs a PDS account to write community records it owns
194 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
195 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
196 if pdsHandle != "" && pdsPassword != "" {
197 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
198 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
199 if authErr != nil {
200 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
201 log.Println("Community creation will fail until PDS authentication is configured")
202 } else {
203 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
204 svc.SetPDSAccessToken(accessToken)
205 log.Println("✓ Coves instance authenticated with PDS")
206 }
207 }
208 } else {
209 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
210 log.Println("Community creation via write-forward is disabled")
211 }
212
213 // Start Jetstream consumer for read-forward user indexing
214 jetstreamURL := os.Getenv("JETSTREAM_URL")
215 if jetstreamURL == "" {
216 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
217 }
218
219 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
220
221 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
222 ctx := context.Background()
223 go func() {
224 if startErr := userConsumer.Start(ctx); startErr != nil {
225 log.Printf("Jetstream consumer stopped: %v", startErr)
226 }
227 }()
228
229 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
230
231 // Start Jetstream consumer for community events (profiles and subscriptions)
232 // This consumer indexes:
233 // 1. Community profiles (social.coves.community.profile) - in community's own repo
234 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
235 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
236 if communityJetstreamURL == "" {
237 // Local Jetstream for communities - filter to our instance's collections
238 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
239 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
240 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
241 }
242
243 // Initialize community event consumer with did:web verification
244 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
245 if skipDIDWebVerification {
246 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
247 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
248 }
249
250 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
251 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
252 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
253
254 go func() {
255 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
256 log.Printf("Community Jetstream consumer stopped: %v", startErr)
257 }
258 }()
259
260 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
261 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
262 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
263
264 // Start JWKS cache cleanup background job
265 go func() {
266 ticker := time.NewTicker(1 * time.Hour)
267 defer ticker.Stop()
268 for range ticker.C {
269 jwksFetcher.CleanupExpiredCache()
270 log.Println("JWKS cache cleanup completed")
271 }
272 }()
273
274 log.Println("Started JWKS cache cleanup background job (runs hourly)")
275
276 // Initialize aggregator service
277 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
278 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
279 log.Println("✅ Aggregator service initialized")
280
281 // Initialize post service (with aggregator support)
282 postRepo := postgresRepo.NewPostRepository(db)
283 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS)
284
285 // Initialize vote repository (used by Jetstream consumer for indexing)
286 voteRepo := postgresRepo.NewVoteRepository(db)
287 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
288
289 // Initialize feed service
290 feedRepo := postgresRepo.NewCommunityFeedRepository(db)
291 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
292 log.Println("✅ Feed service initialized")
293
294 // Initialize timeline service (home feed from subscribed communities)
295 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
296 timelineService := timeline.NewTimelineService(timelineRepo)
297 log.Println("✅ Timeline service initialized")
298
299 // Initialize discover service (public feed from all communities)
300 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
301 discoverService := discover.NewDiscoverService(discoverRepo)
302 log.Println("✅ Discover service initialized")
303
304 // Start Jetstream consumer for posts
305 // This consumer indexes posts created in community repositories via the firehose
306 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
307 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
308 if postJetstreamURL == "" {
309 // Listen to post record creation events
310 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
311 }
312
313 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
314 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
315
316 go func() {
317 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
318 log.Printf("Post Jetstream consumer stopped: %v", startErr)
319 }
320 }()
321
322 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
323 log.Println(" - Indexing: social.coves.community.post CREATE operations")
324 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
325
326 // Start Jetstream consumer for aggregators
327 // This consumer indexes aggregator service declarations and authorization records
328 // Following Bluesky's pattern for feed generators and labelers
329 // NOTE: Uses the same Jetstream as communities, just filtering different collections
330 aggregatorJetstreamURL := communityJetstreamURL
331 // Override if specific URL needed for testing
332 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
333 aggregatorJetstreamURL = envURL
334 } else if aggregatorJetstreamURL == "" {
335 // Fallback if community URL also not set
336 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
337 }
338
339 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
340 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
341
342 go func() {
343 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
344 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
345 }
346 }()
347
348 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
349 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
350 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
351
352 // Start Jetstream consumer for votes
353 // This consumer indexes votes from user repositories and updates post vote counts
354 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
355 if voteJetstreamURL == "" {
356 // Listen to vote record CREATE/DELETE events from user repositories
357 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
358 }
359
360 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
361 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
362
363 go func() {
364 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
365 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
366 }
367 }()
368
369 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
370 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
371 log.Println(" - Updating: Post vote counts atomically")
372
373 // Register XRPC routes
374 routes.RegisterUserRoutes(r, userService)
375 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
376 log.Println("Community XRPC endpoints registered with OAuth authentication")
377
378 routes.RegisterPostRoutes(r, postService, authMiddleware)
379 log.Println("Post XRPC endpoints registered with OAuth authentication")
380
381 // Vote write endpoints removed - clients write directly to their PDS
382 // The AppView indexes votes from Jetstream (see vote consumer above)
383
384 routes.RegisterCommunityFeedRoutes(r, feedService)
385 log.Println("Feed XRPC endpoints registered (public, no auth required)")
386
387 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
388 log.Println("Timeline XRPC endpoints registered (requires authentication)")
389
390 routes.RegisterDiscoverRoutes(r, discoverService)
391 log.Println("Discover XRPC endpoints registered (public, no auth required)")
392
393 routes.RegisterAggregatorRoutes(r, aggregatorService)
394 log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
395
396 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
397 w.WriteHeader(http.StatusOK)
398 if _, err := w.Write([]byte("OK")); err != nil {
399 log.Printf("Failed to write health check response: %v", err)
400 }
401 })
402
403 port := os.Getenv("APPVIEW_PORT")
404 if port == "" {
405 port = "8081" // Match .env.dev default
406 }
407
408 fmt.Printf("Coves AppView starting on port %s\n", port)
409 fmt.Printf("Default PDS: %s\n", defaultPDS)
410 log.Fatal(http.ListenAndServe(":"+port, r))
411}
412
413// authenticateWithPDS creates a session on the PDS and returns an access token
414func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
415 type CreateSessionRequest struct {
416 Identifier string `json:"identifier"`
417 Password string `json:"password"`
418 }
419
420 type CreateSessionResponse struct {
421 DID string `json:"did"`
422 Handle string `json:"handle"`
423 AccessJwt string `json:"accessJwt"`
424 }
425
426 reqBody, err := json.Marshal(CreateSessionRequest{
427 Identifier: handle,
428 Password: password,
429 })
430 if err != nil {
431 return "", fmt.Errorf("failed to marshal request: %w", err)
432 }
433
434 resp, err := http.Post(
435 pdsURL+"/xrpc/com.atproto.server.createSession",
436 "application/json",
437 bytes.NewReader(reqBody),
438 )
439 if err != nil {
440 return "", fmt.Errorf("failed to call PDS: %w", err)
441 }
442 defer func() {
443 if closeErr := resp.Body.Close(); closeErr != nil {
444 log.Printf("Failed to close response body: %v", closeErr)
445 }
446 }()
447
448 if resp.StatusCode != http.StatusOK {
449 body, readErr := io.ReadAll(resp.Body)
450 if readErr != nil {
451 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
452 }
453 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
454 }
455
456 var session CreateSessionResponse
457 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
458 return "", fmt.Errorf("failed to decode response: %w", err)
459 }
460
461 return session.AccessJwt, nil
462}