A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/blobs"
11 "Coves/internal/core/comments"
12 "Coves/internal/core/communities"
13 "Coves/internal/core/communityFeeds"
14 "Coves/internal/core/discover"
15 "Coves/internal/core/posts"
16 "Coves/internal/core/timeline"
17 "Coves/internal/core/unfurl"
18 "Coves/internal/core/users"
19 "bytes"
20 "context"
21 "database/sql"
22 "encoding/json"
23 "fmt"
24 "io"
25 "log"
26 "net/http"
27 "os"
28 "strings"
29 "time"
30
31 "github.com/go-chi/chi/v5"
32 chiMiddleware "github.com/go-chi/chi/v5/middleware"
33 _ "github.com/lib/pq"
34 "github.com/pressly/goose/v3"
35
36 commentsAPI "Coves/internal/api/handlers/comments"
37
38 postgresRepo "Coves/internal/db/postgres"
39)
40
41func main() {
42 // Database configuration (AppView database)
43 dbURL := os.Getenv("DATABASE_URL")
44 if dbURL == "" {
45 // Use dev database from .env.dev
46 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
47 }
48
49 // Default PDS URL for this Coves instance (supports self-hosting)
50 defaultPDS := os.Getenv("PDS_URL")
51 if defaultPDS == "" {
52 defaultPDS = "http://localhost:3001" // Local dev PDS
53 }
54
55 // Cursor secret for HMAC signing (prevents cursor manipulation)
56 cursorSecret := os.Getenv("CURSOR_SECRET")
57 if cursorSecret == "" {
58 // Generate a random secret if not set (dev mode)
59 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
60 cursorSecret = "dev-cursor-secret-change-in-production"
61 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
62 }
63
64 db, err := sql.Open("postgres", dbURL)
65 if err != nil {
66 log.Fatal("Failed to connect to database:", err)
67 }
68 defer func() {
69 if closeErr := db.Close(); closeErr != nil {
70 log.Printf("Failed to close database connection: %v", closeErr)
71 }
72 }()
73
74 if err = db.Ping(); err != nil {
75 log.Fatal("Failed to ping database:", err)
76 }
77
78 log.Println("Connected to AppView database")
79
80 // Run migrations
81 if err = goose.SetDialect("postgres"); err != nil {
82 log.Fatal("Failed to set goose dialect:", err)
83 }
84
85 if err = goose.Up(db, "internal/db/migrations"); err != nil {
86 log.Fatal("Failed to run migrations:", err)
87 }
88
89 log.Println("Migrations completed successfully")
90
91 r := chi.NewRouter()
92
93 r.Use(chiMiddleware.Logger)
94 r.Use(chiMiddleware.Recoverer)
95 r.Use(chiMiddleware.RequestID)
96
97 // Rate limiting: 100 requests per minute per IP
98 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
99 r.Use(rateLimiter.Middleware)
100
101 // Initialize identity resolver
102 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
103 // directory as DID registration to ensure E2E tests work without hitting
104 // the production plc.directory
105 identityConfig := identity.DefaultConfig()
106
107 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
108 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
109 if plcDirectoryURL == "" {
110 plcDirectoryURL = "https://plc.directory" // Default to production PLC
111 }
112
113 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
114 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
115 if isDevEnv {
116 identityConfig.PLCURL = plcDirectoryURL
117 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
118 } else {
119 // Production: Allow separate IDENTITY_PLC_URL for read operations
120 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
121 identityConfig.PLCURL = identityPLCURL
122 } else {
123 identityConfig.PLCURL = plcDirectoryURL
124 }
125 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
126 }
127
128 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
129 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
130 identityConfig.CacheTTL = duration
131 }
132 }
133
134 identityResolver := identity.NewResolver(db, identityConfig)
135
136 // Initialize atProto auth middleware for JWT validation
137 // Phase 1: Set skipVerify=true to test JWT parsing only
138 // Phase 2: Set skipVerify=false to enable full signature verification
139 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
140 if skipVerify {
141 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
142 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
143 }
144
145 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
146 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
147 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
148 log.Println("✅ atProto auth middleware initialized")
149
150 // Initialize repositories and services
151 userRepo := postgresRepo.NewUserRepository(db)
152 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
153
154 communityRepo := postgresRepo.NewCommunityRepository(db)
155
156 // V2.0: PDS-managed DID generation
157 // Community DIDs and keys are generated entirely by the PDS
158 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
159
160 instanceDID := os.Getenv("INSTANCE_DID")
161 if instanceDID == "" {
162 instanceDID = "did:web:coves.social" // Default for development
163 }
164
165 // V2: Extract instance domain for community handles
166 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
167 // We cannot allow arbitrary domains to prevent impersonation attacks
168 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
169 //
170 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
171 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
172 // Communities with mismatched hostedBy domains are rejected during indexing
173 var instanceDomain string
174 if strings.HasPrefix(instanceDID, "did:web:") {
175 // Extract domain from did:web (this is the authoritative source)
176 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
177 } else {
178 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
179 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
180 if instanceDomain == "" {
181 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
182 }
183 }
184
185 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
186
187 // V2.0: Initialize PDS account provisioner for communities (simplified)
188 // PDS handles all DID and key generation - no Coves-side cryptography needed
189 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
190 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
191 log.Printf(" - Communities will be created at: %s", defaultPDS)
192 log.Printf(" - PDS will generate and manage all DIDs and keys")
193
194 // Initialize community service (no longer needs didGenerator directly)
195 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
196
197 // Authenticate Coves instance with PDS to enable community record writes
198 // The instance needs a PDS account to write community records it owns
199 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
200 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
201 if pdsHandle != "" && pdsPassword != "" {
202 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
203 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
204 if authErr != nil {
205 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
206 log.Println("Community creation will fail until PDS authentication is configured")
207 } else {
208 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
209 svc.SetPDSAccessToken(accessToken)
210 log.Println("✓ Coves instance authenticated with PDS")
211 }
212 }
213 } else {
214 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
215 log.Println("Community creation via write-forward is disabled")
216 }
217
218 // Start Jetstream consumer for read-forward user indexing
219 jetstreamURL := os.Getenv("JETSTREAM_URL")
220 if jetstreamURL == "" {
221 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
222 }
223
224 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
225
226 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
227 ctx := context.Background()
228 go func() {
229 if startErr := userConsumer.Start(ctx); startErr != nil {
230 log.Printf("Jetstream consumer stopped: %v", startErr)
231 }
232 }()
233
234 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
235
236 // Start Jetstream consumer for community events (profiles and subscriptions)
237 // This consumer indexes:
238 // 1. Community profiles (social.coves.community.profile) - in community's own repo
239 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
240 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
241 if communityJetstreamURL == "" {
242 // Local Jetstream for communities - filter to our instance's collections
243 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
244 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
245 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
246 }
247
248 // Initialize community event consumer with did:web verification
249 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
250 if skipDIDWebVerification {
251 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
252 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
253 }
254
255 // Pass identity resolver to consumer for PLC handle resolution (source of truth)
256 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification, identityResolver)
257 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
258
259 go func() {
260 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
261 log.Printf("Community Jetstream consumer stopped: %v", startErr)
262 }
263 }()
264
265 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
266 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
267 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
268
269 // Start JWKS cache cleanup background job
270 go func() {
271 ticker := time.NewTicker(1 * time.Hour)
272 defer ticker.Stop()
273 for range ticker.C {
274 jwksFetcher.CleanupExpiredCache()
275 log.Println("JWKS cache cleanup completed")
276 }
277 }()
278
279 log.Println("Started JWKS cache cleanup background job (runs hourly)")
280
281 // Initialize aggregator service
282 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
283 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
284 log.Println("✅ Aggregator service initialized")
285
286 // Initialize unfurl cache repository
287 unfurlRepo := unfurl.NewRepository(db)
288
289 // Initialize blob upload service
290 blobService := blobs.NewBlobService(defaultPDS)
291
292 // Initialize unfurl service with configuration
293 unfurlService := unfurl.NewService(
294 unfurlRepo,
295 unfurl.WithTimeout(10*time.Second),
296 unfurl.WithUserAgent("CovesBot/1.0 (+https://coves.social)"),
297 unfurl.WithCacheTTL(24*time.Hour),
298 )
299 log.Println("✅ Unfurl and blob services initialized")
300
301 // Initialize post service (with aggregator support)
302 postRepo := postgresRepo.NewPostRepository(db)
303 postService := posts.NewPostService(postRepo, communityService, aggregatorService, blobService, unfurlService, defaultPDS)
304
305 // Initialize vote repository (used by Jetstream consumer for indexing)
306 voteRepo := postgresRepo.NewVoteRepository(db)
307 log.Println("✅ Vote repository initialized (Jetstream indexing only)")
308
309 // Initialize comment repository (used by Jetstream consumer for indexing)
310 commentRepo := postgresRepo.NewCommentRepository(db)
311 log.Println("✅ Comment repository initialized (Jetstream indexing only)")
312
313 // Initialize comment service (for query API)
314 // Requires user and community repos for proper author/community hydration per lexicon
315 commentService := comments.NewCommentService(commentRepo, userRepo, postRepo, communityRepo)
316 log.Println("✅ Comment service initialized (with author/community hydration)")
317
318 // Initialize feed service
319 feedRepo := postgresRepo.NewCommunityFeedRepository(db, cursorSecret)
320 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
321 log.Println("✅ Feed service initialized")
322
323 // Initialize timeline service (home feed from subscribed communities)
324 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
325 timelineService := timeline.NewTimelineService(timelineRepo)
326 log.Println("✅ Timeline service initialized")
327
328 // Initialize discover service (public feed from all communities)
329 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
330 discoverService := discover.NewDiscoverService(discoverRepo)
331 log.Println("✅ Discover service initialized")
332
333 // Start Jetstream consumer for posts
334 // This consumer indexes posts created in community repositories via the firehose
335 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
336 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
337 if postJetstreamURL == "" {
338 // Listen to post record creation events
339 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.post"
340 }
341
342 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService, db)
343 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
344
345 go func() {
346 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
347 log.Printf("Post Jetstream consumer stopped: %v", startErr)
348 }
349 }()
350
351 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
352 log.Println(" - Indexing: social.coves.community.post CREATE operations")
353 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
354
355 // Start Jetstream consumer for aggregators
356 // This consumer indexes aggregator service declarations and authorization records
357 // Following Bluesky's pattern for feed generators and labelers
358 // NOTE: Uses the same Jetstream as communities, just filtering different collections
359 aggregatorJetstreamURL := communityJetstreamURL
360 // Override if specific URL needed for testing
361 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
362 aggregatorJetstreamURL = envURL
363 } else if aggregatorJetstreamURL == "" {
364 // Fallback if community URL also not set
365 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
366 }
367
368 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
369 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
370
371 go func() {
372 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
373 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
374 }
375 }()
376
377 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
378 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
379 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
380
381 // Start Jetstream consumer for votes
382 // This consumer indexes votes from user repositories and updates post vote counts
383 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
384 if voteJetstreamURL == "" {
385 // Listen to vote record CREATE/DELETE events from user repositories
386 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.vote"
387 }
388
389 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
390 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
391
392 go func() {
393 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
394 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
395 }
396 }()
397
398 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
399 log.Println(" - Indexing: social.coves.feed.vote CREATE/DELETE operations")
400 log.Println(" - Updating: Post vote counts atomically")
401
402 // Start Jetstream consumer for comments
403 // This consumer indexes comments from user repositories and updates parent counts
404 commentJetstreamURL := os.Getenv("COMMENT_JETSTREAM_URL")
405 if commentJetstreamURL == "" {
406 // Listen to comment record CREATE/UPDATE/DELETE events from user repositories
407 commentJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.feed.comment"
408 }
409
410 commentEventConsumer := jetstream.NewCommentEventConsumer(commentRepo, db)
411 commentJetstreamConnector := jetstream.NewCommentJetstreamConnector(commentEventConsumer, commentJetstreamURL)
412
413 go func() {
414 if startErr := commentJetstreamConnector.Start(ctx); startErr != nil {
415 log.Printf("Comment Jetstream consumer stopped: %v", startErr)
416 }
417 }()
418
419 log.Printf("Started Jetstream comment consumer: %s", commentJetstreamURL)
420 log.Println(" - Indexing: social.coves.feed.comment CREATE/UPDATE/DELETE operations")
421 log.Println(" - Updating: Post comment counts and comment reply counts atomically")
422
423 // Register XRPC routes
424 routes.RegisterUserRoutes(r, userService)
425 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
426 log.Println("Community XRPC endpoints registered with OAuth authentication")
427
428 routes.RegisterPostRoutes(r, postService, authMiddleware)
429 log.Println("Post XRPC endpoints registered with OAuth authentication")
430
431 // Vote write endpoints removed - clients write directly to their PDS
432 // The AppView indexes votes from Jetstream (see vote consumer above)
433
434 routes.RegisterCommunityFeedRoutes(r, feedService)
435 log.Println("Feed XRPC endpoints registered (public, no auth required)")
436
437 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
438 log.Println("Timeline XRPC endpoints registered (requires authentication)")
439
440 routes.RegisterDiscoverRoutes(r, discoverService)
441 log.Println("Discover XRPC endpoints registered (public, no auth required)")
442
443 routes.RegisterAggregatorRoutes(r, aggregatorService)
444 log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
445
446 // Comment query API - supports optional authentication for viewer state
447 // Stricter rate limiting for expensive nested comment queries
448 commentRateLimiter := middleware.NewRateLimiter(20, 1*time.Minute)
449 commentServiceAdapter := commentsAPI.NewServiceAdapter(commentService)
450 commentHandler := commentsAPI.NewGetCommentsHandler(commentServiceAdapter)
451 r.Handle(
452 "/xrpc/social.coves.community.comment.getComments",
453 commentRateLimiter.Middleware(
454 commentsAPI.OptionalAuthMiddleware(authMiddleware, commentHandler.HandleGetComments),
455 ),
456 )
457 log.Println("✅ Comment query API registered (20 req/min rate limit)")
458 log.Println(" - GET /xrpc/social.coves.community.comment.getComments")
459
460 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
461 w.WriteHeader(http.StatusOK)
462 if _, err := w.Write([]byte("OK")); err != nil {
463 log.Printf("Failed to write health check response: %v", err)
464 }
465 })
466
467 port := os.Getenv("APPVIEW_PORT")
468 if port == "" {
469 port = "8081" // Match .env.dev default
470 }
471
472 fmt.Printf("Coves AppView starting on port %s\n", port)
473 fmt.Printf("Default PDS: %s\n", defaultPDS)
474 log.Fatal(http.ListenAndServe(":"+port, r))
475}
476
477// authenticateWithPDS creates a session on the PDS and returns an access token
478func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
479 type CreateSessionRequest struct {
480 Identifier string `json:"identifier"`
481 Password string `json:"password"`
482 }
483
484 type CreateSessionResponse struct {
485 DID string `json:"did"`
486 Handle string `json:"handle"`
487 AccessJwt string `json:"accessJwt"`
488 }
489
490 reqBody, err := json.Marshal(CreateSessionRequest{
491 Identifier: handle,
492 Password: password,
493 })
494 if err != nil {
495 return "", fmt.Errorf("failed to marshal request: %w", err)
496 }
497
498 resp, err := http.Post(
499 pdsURL+"/xrpc/com.atproto.server.createSession",
500 "application/json",
501 bytes.NewReader(reqBody),
502 )
503 if err != nil {
504 return "", fmt.Errorf("failed to call PDS: %w", err)
505 }
506 defer func() {
507 if closeErr := resp.Body.Close(); closeErr != nil {
508 log.Printf("Failed to close response body: %v", closeErr)
509 }
510 }()
511
512 if resp.StatusCode != http.StatusOK {
513 body, readErr := io.ReadAll(resp.Body)
514 if readErr != nil {
515 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
516 }
517 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
518 }
519
520 var session CreateSessionResponse
521 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
522 return "", fmt.Errorf("failed to decode response: %w", err)
523 }
524
525 return session.AccessJwt, nil
526}