A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/communities"
11 "Coves/internal/core/communityFeeds"
12 "Coves/internal/core/discover"
13 "Coves/internal/core/posts"
14 "Coves/internal/core/timeline"
15 "Coves/internal/core/users"
16 "Coves/internal/core/votes"
17 "bytes"
18 "context"
19 "database/sql"
20 "encoding/json"
21 "fmt"
22 "io"
23 "log"
24 "net/http"
25 "os"
26 "strings"
27 "time"
28
29 "github.com/go-chi/chi/v5"
30 chiMiddleware "github.com/go-chi/chi/v5/middleware"
31 _ "github.com/lib/pq"
32 "github.com/pressly/goose/v3"
33
34 postgresRepo "Coves/internal/db/postgres"
35)
36
37func main() {
38 // Database configuration (AppView database)
39 dbURL := os.Getenv("DATABASE_URL")
40 if dbURL == "" {
41 // Use dev database from .env.dev
42 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
43 }
44
45 // Default PDS URL for this Coves instance (supports self-hosting)
46 defaultPDS := os.Getenv("PDS_URL")
47 if defaultPDS == "" {
48 defaultPDS = "http://localhost:3001" // Local dev PDS
49 }
50
51 // Cursor secret for HMAC signing (prevents cursor manipulation)
52 cursorSecret := os.Getenv("CURSOR_SECRET")
53 if cursorSecret == "" {
54 // Generate a random secret if not set (dev mode)
55 // IMPORTANT: In production, set CURSOR_SECRET to a strong random value
56 cursorSecret = "dev-cursor-secret-change-in-production"
57 log.Println("⚠️ WARNING: Using default cursor secret. Set CURSOR_SECRET env var in production!")
58 }
59
60 db, err := sql.Open("postgres", dbURL)
61 if err != nil {
62 log.Fatal("Failed to connect to database:", err)
63 }
64 defer func() {
65 if closeErr := db.Close(); closeErr != nil {
66 log.Printf("Failed to close database connection: %v", closeErr)
67 }
68 }()
69
70 if err = db.Ping(); err != nil {
71 log.Fatal("Failed to ping database:", err)
72 }
73
74 log.Println("Connected to AppView database")
75
76 // Run migrations
77 if err = goose.SetDialect("postgres"); err != nil {
78 log.Fatal("Failed to set goose dialect:", err)
79 }
80
81 if err = goose.Up(db, "internal/db/migrations"); err != nil {
82 log.Fatal("Failed to run migrations:", err)
83 }
84
85 log.Println("Migrations completed successfully")
86
87 r := chi.NewRouter()
88
89 r.Use(chiMiddleware.Logger)
90 r.Use(chiMiddleware.Recoverer)
91 r.Use(chiMiddleware.RequestID)
92
93 // Rate limiting: 100 requests per minute per IP
94 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
95 r.Use(rateLimiter.Middleware)
96
97 // Initialize identity resolver
98 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
99 // directory as DID registration to ensure E2E tests work without hitting
100 // the production plc.directory
101 identityConfig := identity.DefaultConfig()
102
103 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
104 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
105 if plcDirectoryURL == "" {
106 plcDirectoryURL = "https://plc.directory" // Default to production PLC
107 }
108
109 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
110 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
111 if isDevEnv {
112 identityConfig.PLCURL = plcDirectoryURL
113 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
114 } else {
115 // Production: Allow separate IDENTITY_PLC_URL for read operations
116 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
117 identityConfig.PLCURL = identityPLCURL
118 } else {
119 identityConfig.PLCURL = plcDirectoryURL
120 }
121 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
122 }
123
124 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
125 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
126 identityConfig.CacheTTL = duration
127 }
128 }
129
130 identityResolver := identity.NewResolver(db, identityConfig)
131
132 // Initialize atProto auth middleware for JWT validation
133 // Phase 1: Set skipVerify=true to test JWT parsing only
134 // Phase 2: Set skipVerify=false to enable full signature verification
135 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
136 if skipVerify {
137 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
138 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
139 }
140
141 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
142 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
143 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
144 log.Println("✅ atProto auth middleware initialized")
145
146 // Initialize repositories and services
147 userRepo := postgresRepo.NewUserRepository(db)
148 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
149
150 communityRepo := postgresRepo.NewCommunityRepository(db)
151
152 // V2.0: PDS-managed DID generation
153 // Community DIDs and keys are generated entirely by the PDS
154 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
155
156 instanceDID := os.Getenv("INSTANCE_DID")
157 if instanceDID == "" {
158 instanceDID = "did:web:coves.social" // Default for development
159 }
160
161 // V2: Extract instance domain for community handles
162 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
163 // We cannot allow arbitrary domains to prevent impersonation attacks
164 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
165 //
166 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
167 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
168 // Communities with mismatched hostedBy domains are rejected during indexing
169 var instanceDomain string
170 if strings.HasPrefix(instanceDID, "did:web:") {
171 // Extract domain from did:web (this is the authoritative source)
172 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
173 } else {
174 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
175 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
176 if instanceDomain == "" {
177 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
178 }
179 }
180
181 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
182
183 // V2.0: Initialize PDS account provisioner for communities (simplified)
184 // PDS handles all DID and key generation - no Coves-side cryptography needed
185 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
186 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
187 log.Printf(" - Communities will be created at: %s", defaultPDS)
188 log.Printf(" - PDS will generate and manage all DIDs and keys")
189
190 // Initialize community service (no longer needs didGenerator directly)
191 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
192
193 // Authenticate Coves instance with PDS to enable community record writes
194 // The instance needs a PDS account to write community records it owns
195 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
196 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
197 if pdsHandle != "" && pdsPassword != "" {
198 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
199 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
200 if authErr != nil {
201 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
202 log.Println("Community creation will fail until PDS authentication is configured")
203 } else {
204 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
205 svc.SetPDSAccessToken(accessToken)
206 log.Println("✓ Coves instance authenticated with PDS")
207 }
208 }
209 } else {
210 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
211 log.Println("Community creation via write-forward is disabled")
212 }
213
214 // Start Jetstream consumer for read-forward user indexing
215 jetstreamURL := os.Getenv("JETSTREAM_URL")
216 if jetstreamURL == "" {
217 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
218 }
219
220 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
221
222 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
223 ctx := context.Background()
224 go func() {
225 if startErr := userConsumer.Start(ctx); startErr != nil {
226 log.Printf("Jetstream consumer stopped: %v", startErr)
227 }
228 }()
229
230 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
231
232 // Start Jetstream consumer for community events (profiles and subscriptions)
233 // This consumer indexes:
234 // 1. Community profiles (social.coves.community.profile) - in community's own repo
235 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
236 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
237 if communityJetstreamURL == "" {
238 // Local Jetstream for communities - filter to our instance's collections
239 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
240 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
241 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
242 }
243
244 // Initialize community event consumer with did:web verification
245 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
246 if skipDIDWebVerification {
247 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
248 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
249 }
250
251 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification)
252 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
253
254 go func() {
255 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
256 log.Printf("Community Jetstream consumer stopped: %v", startErr)
257 }
258 }()
259
260 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
261 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
262 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
263
264 // Start JWKS cache cleanup background job
265 go func() {
266 ticker := time.NewTicker(1 * time.Hour)
267 defer ticker.Stop()
268 for range ticker.C {
269 jwksFetcher.CleanupExpiredCache()
270 log.Println("JWKS cache cleanup completed")
271 }
272 }()
273
274 log.Println("Started JWKS cache cleanup background job (runs hourly)")
275
276 // Initialize aggregator service
277 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
278 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
279 log.Println("✅ Aggregator service initialized")
280
281 // Initialize post service (with aggregator support)
282 postRepo := postgresRepo.NewPostRepository(db)
283 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS)
284
285 // Initialize vote service
286 voteRepo := postgresRepo.NewVoteRepository(db)
287 voteService := votes.NewVoteService(voteRepo, postRepo, defaultPDS)
288 log.Println("✅ Vote service initialized")
289
290 // Initialize feed service
291 feedRepo := postgresRepo.NewCommunityFeedRepository(db)
292 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
293 log.Println("✅ Feed service initialized")
294
295 // Initialize timeline service (home feed from subscribed communities)
296 timelineRepo := postgresRepo.NewTimelineRepository(db, cursorSecret)
297 timelineService := timeline.NewTimelineService(timelineRepo)
298 log.Println("✅ Timeline service initialized")
299
300 // Initialize discover service (public feed from all communities)
301 discoverRepo := postgresRepo.NewDiscoverRepository(db, cursorSecret)
302 discoverService := discover.NewDiscoverService(discoverRepo)
303 log.Println("✅ Discover service initialized")
304
305 // Start Jetstream consumer for posts
306 // This consumer indexes posts created in community repositories via the firehose
307 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
308 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
309 if postJetstreamURL == "" {
310 // Listen to post record creation events
311 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record"
312 }
313
314 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
315 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
316
317 go func() {
318 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
319 log.Printf("Post Jetstream consumer stopped: %v", startErr)
320 }
321 }()
322
323 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
324 log.Println(" - Indexing: social.coves.post.record CREATE operations")
325 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
326
327 // Start Jetstream consumer for aggregators
328 // This consumer indexes aggregator service declarations and authorization records
329 // Following Bluesky's pattern for feed generators and labelers
330 // NOTE: Uses the same Jetstream as communities, just filtering different collections
331 aggregatorJetstreamURL := communityJetstreamURL
332 // Override if specific URL needed for testing
333 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
334 aggregatorJetstreamURL = envURL
335 } else if aggregatorJetstreamURL == "" {
336 // Fallback if community URL also not set
337 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
338 }
339
340 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
341 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
342
343 go func() {
344 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
345 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
346 }
347 }()
348
349 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
350 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
351 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
352
353 // Start Jetstream consumer for votes
354 // This consumer indexes votes from user repositories and updates post vote counts
355 voteJetstreamURL := os.Getenv("VOTE_JETSTREAM_URL")
356 if voteJetstreamURL == "" {
357 // Listen to vote record CREATE/DELETE events from user repositories
358 voteJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.interaction.vote"
359 }
360
361 voteEventConsumer := jetstream.NewVoteEventConsumer(voteRepo, userService, db)
362 voteJetstreamConnector := jetstream.NewVoteJetstreamConnector(voteEventConsumer, voteJetstreamURL)
363
364 go func() {
365 if startErr := voteJetstreamConnector.Start(ctx); startErr != nil {
366 log.Printf("Vote Jetstream consumer stopped: %v", startErr)
367 }
368 }()
369
370 log.Printf("Started Jetstream vote consumer: %s", voteJetstreamURL)
371 log.Println(" - Indexing: social.coves.interaction.vote CREATE/DELETE operations")
372 log.Println(" - Updating: Post vote counts atomically")
373
374 // Register XRPC routes
375 routes.RegisterUserRoutes(r, userService)
376 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
377 log.Println("Community XRPC endpoints registered with OAuth authentication")
378
379 routes.RegisterPostRoutes(r, postService, authMiddleware)
380 log.Println("Post XRPC endpoints registered with OAuth authentication")
381
382 routes.RegisterVoteRoutes(r, voteService, authMiddleware)
383 log.Println("Vote XRPC endpoints registered with OAuth authentication")
384
385 routes.RegisterCommunityFeedRoutes(r, feedService)
386 log.Println("Feed XRPC endpoints registered (public, no auth required)")
387
388 routes.RegisterTimelineRoutes(r, timelineService, authMiddleware)
389 log.Println("Timeline XRPC endpoints registered (requires authentication)")
390
391 routes.RegisterDiscoverRoutes(r, discoverService)
392 log.Println("Discover XRPC endpoints registered (public, no auth required)")
393
394 routes.RegisterAggregatorRoutes(r, aggregatorService)
395 log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
396
397 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
398 w.WriteHeader(http.StatusOK)
399 if _, err := w.Write([]byte("OK")); err != nil {
400 log.Printf("Failed to write health check response: %v", err)
401 }
402 })
403
404 port := os.Getenv("APPVIEW_PORT")
405 if port == "" {
406 port = "8081" // Match .env.dev default
407 }
408
409 fmt.Printf("Coves AppView starting on port %s\n", port)
410 fmt.Printf("Default PDS: %s\n", defaultPDS)
411 log.Fatal(http.ListenAndServe(":"+port, r))
412}
413
414// authenticateWithPDS creates a session on the PDS and returns an access token
415func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
416 type CreateSessionRequest struct {
417 Identifier string `json:"identifier"`
418 Password string `json:"password"`
419 }
420
421 type CreateSessionResponse struct {
422 DID string `json:"did"`
423 Handle string `json:"handle"`
424 AccessJwt string `json:"accessJwt"`
425 }
426
427 reqBody, err := json.Marshal(CreateSessionRequest{
428 Identifier: handle,
429 Password: password,
430 })
431 if err != nil {
432 return "", fmt.Errorf("failed to marshal request: %w", err)
433 }
434
435 resp, err := http.Post(
436 pdsURL+"/xrpc/com.atproto.server.createSession",
437 "application/json",
438 bytes.NewReader(reqBody),
439 )
440 if err != nil {
441 return "", fmt.Errorf("failed to call PDS: %w", err)
442 }
443 defer func() {
444 if closeErr := resp.Body.Close(); closeErr != nil {
445 log.Printf("Failed to close response body: %v", closeErr)
446 }
447 }()
448
449 if resp.StatusCode != http.StatusOK {
450 body, readErr := io.ReadAll(resp.Body)
451 if readErr != nil {
452 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
453 }
454 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
455 }
456
457 var session CreateSessionResponse
458 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
459 return "", fmt.Errorf("failed to decode response: %w", err)
460 }
461
462 return session.AccessJwt, nil
463}