A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/aggregators"
10 "Coves/internal/core/communities"
11 "Coves/internal/core/communityFeeds"
12 "Coves/internal/core/posts"
13 "Coves/internal/core/users"
14 "bytes"
15 "context"
16 "database/sql"
17 "encoding/json"
18 "fmt"
19 "io"
20 "log"
21 "net/http"
22 "os"
23 "strings"
24 "time"
25
26 "github.com/go-chi/chi/v5"
27 chiMiddleware "github.com/go-chi/chi/v5/middleware"
28 _ "github.com/lib/pq"
29 "github.com/pressly/goose/v3"
30
31 postgresRepo "Coves/internal/db/postgres"
32)
33
34func main() {
35 // Database configuration (AppView database)
36 dbURL := os.Getenv("DATABASE_URL")
37 if dbURL == "" {
38 // Use dev database from .env.dev
39 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
40 }
41
42 // Default PDS URL for this Coves instance (supports self-hosting)
43 defaultPDS := os.Getenv("PDS_URL")
44 if defaultPDS == "" {
45 defaultPDS = "http://localhost:3001" // Local dev PDS
46 }
47
48 db, err := sql.Open("postgres", dbURL)
49 if err != nil {
50 log.Fatal("Failed to connect to database:", err)
51 }
52 defer func() {
53 if closeErr := db.Close(); closeErr != nil {
54 log.Printf("Failed to close database connection: %v", closeErr)
55 }
56 }()
57
58 if err = db.Ping(); err != nil {
59 log.Fatal("Failed to ping database:", err)
60 }
61
62 log.Println("Connected to AppView database")
63
64 // Run migrations
65 if err = goose.SetDialect("postgres"); err != nil {
66 log.Fatal("Failed to set goose dialect:", err)
67 }
68
69 if err = goose.Up(db, "internal/db/migrations"); err != nil {
70 log.Fatal("Failed to run migrations:", err)
71 }
72
73 log.Println("Migrations completed successfully")
74
75 r := chi.NewRouter()
76
77 r.Use(chiMiddleware.Logger)
78 r.Use(chiMiddleware.Recoverer)
79 r.Use(chiMiddleware.RequestID)
80
81 // Rate limiting: 100 requests per minute per IP
82 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
83 r.Use(rateLimiter.Middleware)
84
85 // Initialize identity resolver
86 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
87 // directory as DID registration to ensure E2E tests work without hitting
88 // the production plc.directory
89 identityConfig := identity.DefaultConfig()
90
91 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
92 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
93 if plcDirectoryURL == "" {
94 plcDirectoryURL = "https://plc.directory" // Default to production PLC
95 }
96
97 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
98 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
99 if isDevEnv {
100 identityConfig.PLCURL = plcDirectoryURL
101 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
102 } else {
103 // Production: Allow separate IDENTITY_PLC_URL for read operations
104 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
105 identityConfig.PLCURL = identityPLCURL
106 } else {
107 identityConfig.PLCURL = plcDirectoryURL
108 }
109 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
110 }
111
112 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
113 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
114 identityConfig.CacheTTL = duration
115 }
116 }
117
118 identityResolver := identity.NewResolver(db, identityConfig)
119
120 // Initialize atProto auth middleware for JWT validation
121 // Phase 1: Set skipVerify=true to test JWT parsing only
122 // Phase 2: Set skipVerify=false to enable full signature verification
123 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
124 if skipVerify {
125 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
126 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
127 }
128
129 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
130 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
131 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
132 log.Println("✅ atProto auth middleware initialized")
133
134 // Initialize repositories and services
135 userRepo := postgresRepo.NewUserRepository(db)
136 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
137
138 communityRepo := postgresRepo.NewCommunityRepository(db)
139
140 // V2.0: PDS-managed DID generation
141 // Community DIDs and keys are generated entirely by the PDS
142 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
143
144 instanceDID := os.Getenv("INSTANCE_DID")
145 if instanceDID == "" {
146 instanceDID = "did:web:coves.social" // Default for development
147 }
148
149 // V2: Extract instance domain for community handles
150 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
151 // We cannot allow arbitrary domains to prevent impersonation attacks
152 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
153 //
154 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
155 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
156 // Communities with mismatched hostedBy domains are rejected during indexing
157 var instanceDomain string
158 if strings.HasPrefix(instanceDID, "did:web:") {
159 // Extract domain from did:web (this is the authoritative source)
160 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
161 } else {
162 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
163 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
164 if instanceDomain == "" {
165 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
166 }
167 }
168
169 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
170
171 // V2.0: Initialize PDS account provisioner for communities (simplified)
172 // PDS handles all DID and key generation - no Coves-side cryptography needed
173 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
174 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
175 log.Printf(" - Communities will be created at: %s", defaultPDS)
176 log.Printf(" - PDS will generate and manage all DIDs and keys")
177
178 // Initialize community service (no longer needs didGenerator directly)
179 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
180
181 // Authenticate Coves instance with PDS to enable community record writes
182 // The instance needs a PDS account to write community records it owns
183 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
184 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
185 if pdsHandle != "" && pdsPassword != "" {
186 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
187 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
188 if authErr != nil {
189 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
190 log.Println("Community creation will fail until PDS authentication is configured")
191 } else {
192 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
193 svc.SetPDSAccessToken(accessToken)
194 log.Println("✓ Coves instance authenticated with PDS")
195 }
196 }
197 } else {
198 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
199 log.Println("Community creation via write-forward is disabled")
200 }
201
202 // Start Jetstream consumer for read-forward user indexing
203 jetstreamURL := os.Getenv("JETSTREAM_URL")
204 if jetstreamURL == "" {
205 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
206 }
207
208 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
209
210 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
211 ctx := context.Background()
212 go func() {
213 if startErr := userConsumer.Start(ctx); startErr != nil {
214 log.Printf("Jetstream consumer stopped: %v", startErr)
215 }
216 }()
217
218 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
219
220 // Start Jetstream consumer for community events (profiles and subscriptions)
221 // This consumer indexes:
222 // 1. Community profiles (social.coves.community.profile) - in community's own repo
223 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
224 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
225 if communityJetstreamURL == "" {
226 // Local Jetstream for communities - filter to our instance's collections
227 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
228 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
229 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
230 }
231
232 // Initialize community event consumer with did:web verification
233 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
234 if skipDIDWebVerification {
235 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
236 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
237 }
238
239 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification)
240 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
241
242 go func() {
243 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
244 log.Printf("Community Jetstream consumer stopped: %v", startErr)
245 }
246 }()
247
248 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
249 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
250 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
251
252 // Start JWKS cache cleanup background job
253 go func() {
254 ticker := time.NewTicker(1 * time.Hour)
255 defer ticker.Stop()
256 for range ticker.C {
257 jwksFetcher.CleanupExpiredCache()
258 log.Println("JWKS cache cleanup completed")
259 }
260 }()
261
262 log.Println("Started JWKS cache cleanup background job (runs hourly)")
263
264 // Initialize aggregator service
265 aggregatorRepo := postgresRepo.NewAggregatorRepository(db)
266 aggregatorService := aggregators.NewAggregatorService(aggregatorRepo, communityService)
267 log.Println("✅ Aggregator service initialized")
268
269 // Initialize post service (with aggregator support)
270 postRepo := postgresRepo.NewPostRepository(db)
271 postService := posts.NewPostService(postRepo, communityService, aggregatorService, defaultPDS)
272
273 // Initialize feed service
274 feedRepo := postgresRepo.NewCommunityFeedRepository(db)
275 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
276 log.Println("✅ Feed service initialized")
277
278 // Start Jetstream consumer for posts
279 // This consumer indexes posts created in community repositories via the firehose
280 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
281 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
282 if postJetstreamURL == "" {
283 // Listen to post record creation events
284 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record"
285 }
286
287 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
288 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
289
290 go func() {
291 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
292 log.Printf("Post Jetstream consumer stopped: %v", startErr)
293 }
294 }()
295
296 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
297 log.Println(" - Indexing: social.coves.post.record CREATE operations")
298 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
299
300 // Start Jetstream consumer for aggregators
301 // This consumer indexes aggregator service declarations and authorization records
302 // Following Bluesky's pattern for feed generators and labelers
303 // NOTE: Uses the same Jetstream as communities, just filtering different collections
304 aggregatorJetstreamURL := communityJetstreamURL
305 // Override if specific URL needed for testing
306 if envURL := os.Getenv("AGGREGATOR_JETSTREAM_URL"); envURL != "" {
307 aggregatorJetstreamURL = envURL
308 } else if aggregatorJetstreamURL == "" {
309 // Fallback if community URL also not set
310 aggregatorJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.aggregator.service&wantedCollections=social.coves.aggregator.authorization"
311 }
312
313 aggregatorEventConsumer := jetstream.NewAggregatorEventConsumer(aggregatorRepo)
314 aggregatorJetstreamConnector := jetstream.NewAggregatorJetstreamConnector(aggregatorEventConsumer, aggregatorJetstreamURL)
315
316 go func() {
317 if startErr := aggregatorJetstreamConnector.Start(ctx); startErr != nil {
318 log.Printf("Aggregator Jetstream consumer stopped: %v", startErr)
319 }
320 }()
321
322 log.Printf("Started Jetstream aggregator consumer: %s", aggregatorJetstreamURL)
323 log.Println(" - Indexing: social.coves.aggregator.service (service declarations)")
324 log.Println(" - Indexing: social.coves.aggregator.authorization (authorization records)")
325
326 // Register XRPC routes
327 routes.RegisterUserRoutes(r, userService)
328 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
329 log.Println("Community XRPC endpoints registered with OAuth authentication")
330
331 routes.RegisterPostRoutes(r, postService, authMiddleware)
332 log.Println("Post XRPC endpoints registered with OAuth authentication")
333
334 routes.RegisterCommunityFeedRoutes(r, feedService)
335 log.Println("Feed XRPC endpoints registered (public, no auth required)")
336
337 routes.RegisterAggregatorRoutes(r, aggregatorService)
338 log.Println("Aggregator XRPC endpoints registered (query endpoints public)")
339
340 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
341 w.WriteHeader(http.StatusOK)
342 if _, err := w.Write([]byte("OK")); err != nil {
343 log.Printf("Failed to write health check response: %v", err)
344 }
345 })
346
347 port := os.Getenv("APPVIEW_PORT")
348 if port == "" {
349 port = "8081" // Match .env.dev default
350 }
351
352 fmt.Printf("Coves AppView starting on port %s\n", port)
353 fmt.Printf("Default PDS: %s\n", defaultPDS)
354 log.Fatal(http.ListenAndServe(":"+port, r))
355}
356
357// authenticateWithPDS creates a session on the PDS and returns an access token
358func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
359 type CreateSessionRequest struct {
360 Identifier string `json:"identifier"`
361 Password string `json:"password"`
362 }
363
364 type CreateSessionResponse struct {
365 DID string `json:"did"`
366 Handle string `json:"handle"`
367 AccessJwt string `json:"accessJwt"`
368 }
369
370 reqBody, err := json.Marshal(CreateSessionRequest{
371 Identifier: handle,
372 Password: password,
373 })
374 if err != nil {
375 return "", fmt.Errorf("failed to marshal request: %w", err)
376 }
377
378 resp, err := http.Post(
379 pdsURL+"/xrpc/com.atproto.server.createSession",
380 "application/json",
381 bytes.NewReader(reqBody),
382 )
383 if err != nil {
384 return "", fmt.Errorf("failed to call PDS: %w", err)
385 }
386 defer func() {
387 if closeErr := resp.Body.Close(); closeErr != nil {
388 log.Printf("Failed to close response body: %v", closeErr)
389 }
390 }()
391
392 if resp.StatusCode != http.StatusOK {
393 body, readErr := io.ReadAll(resp.Body)
394 if readErr != nil {
395 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
396 }
397 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
398 }
399
400 var session CreateSessionResponse
401 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
402 return "", fmt.Errorf("failed to decode response: %w", err)
403 }
404
405 return session.AccessJwt, nil
406}