A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/api/routes"
6 "Coves/internal/atproto/auth"
7 "Coves/internal/atproto/identity"
8 "Coves/internal/atproto/jetstream"
9 "Coves/internal/core/communities"
10 "Coves/internal/core/communityFeeds"
11 "Coves/internal/core/posts"
12 "Coves/internal/core/users"
13 "bytes"
14 "context"
15 "database/sql"
16 "encoding/json"
17 "fmt"
18 "io"
19 "log"
20 "net/http"
21 "os"
22 "strings"
23 "time"
24
25 "github.com/go-chi/chi/v5"
26 chiMiddleware "github.com/go-chi/chi/v5/middleware"
27 _ "github.com/lib/pq"
28 "github.com/pressly/goose/v3"
29
30 postgresRepo "Coves/internal/db/postgres"
31)
32
33func main() {
34 // Database configuration (AppView database)
35 dbURL := os.Getenv("DATABASE_URL")
36 if dbURL == "" {
37 // Use dev database from .env.dev
38 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
39 }
40
41 // Default PDS URL for this Coves instance (supports self-hosting)
42 defaultPDS := os.Getenv("PDS_URL")
43 if defaultPDS == "" {
44 defaultPDS = "http://localhost:3001" // Local dev PDS
45 }
46
47 db, err := sql.Open("postgres", dbURL)
48 if err != nil {
49 log.Fatal("Failed to connect to database:", err)
50 }
51 defer func() {
52 if closeErr := db.Close(); closeErr != nil {
53 log.Printf("Failed to close database connection: %v", closeErr)
54 }
55 }()
56
57 if err = db.Ping(); err != nil {
58 log.Fatal("Failed to ping database:", err)
59 }
60
61 log.Println("Connected to AppView database")
62
63 // Run migrations
64 if err = goose.SetDialect("postgres"); err != nil {
65 log.Fatal("Failed to set goose dialect:", err)
66 }
67
68 if err = goose.Up(db, "internal/db/migrations"); err != nil {
69 log.Fatal("Failed to run migrations:", err)
70 }
71
72 log.Println("Migrations completed successfully")
73
74 r := chi.NewRouter()
75
76 r.Use(chiMiddleware.Logger)
77 r.Use(chiMiddleware.Recoverer)
78 r.Use(chiMiddleware.RequestID)
79
80 // Rate limiting: 100 requests per minute per IP
81 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
82 r.Use(rateLimiter.Middleware)
83
84 // Initialize identity resolver
85 // IMPORTANT: In dev mode, identity resolution MUST use the same local PLC
86 // directory as DID registration to ensure E2E tests work without hitting
87 // the production plc.directory
88 identityConfig := identity.DefaultConfig()
89
90 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
91 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
92 if plcDirectoryURL == "" {
93 plcDirectoryURL = "https://plc.directory" // Default to production PLC
94 }
95
96 // In dev mode, use PLC_DIRECTORY_URL for identity resolution
97 // In prod mode, use IDENTITY_PLC_URL if set, otherwise PLC_DIRECTORY_URL
98 if isDevEnv {
99 identityConfig.PLCURL = plcDirectoryURL
100 log.Printf("🧪 DEV MODE: Identity resolver will use local PLC: %s", plcDirectoryURL)
101 } else {
102 // Production: Allow separate IDENTITY_PLC_URL for read operations
103 if identityPLCURL := os.Getenv("IDENTITY_PLC_URL"); identityPLCURL != "" {
104 identityConfig.PLCURL = identityPLCURL
105 } else {
106 identityConfig.PLCURL = plcDirectoryURL
107 }
108 log.Printf("✅ PRODUCTION MODE: Identity resolver using PLC: %s", identityConfig.PLCURL)
109 }
110
111 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
112 if duration, parseErr := time.ParseDuration(cacheTTL); parseErr == nil {
113 identityConfig.CacheTTL = duration
114 }
115 }
116
117 identityResolver := identity.NewResolver(db, identityConfig)
118
119 // Initialize atProto auth middleware for JWT validation
120 // Phase 1: Set skipVerify=true to test JWT parsing only
121 // Phase 2: Set skipVerify=false to enable full signature verification
122 skipVerify := os.Getenv("AUTH_SKIP_VERIFY") == "true"
123 if skipVerify {
124 log.Println("⚠️ WARNING: JWT signature verification is DISABLED (Phase 1 testing)")
125 log.Println(" Set AUTH_SKIP_VERIFY=false for production")
126 }
127
128 jwksCacheTTL := 1 * time.Hour // Cache public keys for 1 hour
129 jwksFetcher := auth.NewCachedJWKSFetcher(jwksCacheTTL)
130 authMiddleware := middleware.NewAtProtoAuthMiddleware(jwksFetcher, skipVerify)
131 log.Println("✅ atProto auth middleware initialized")
132
133 // Initialize repositories and services
134 userRepo := postgresRepo.NewUserRepository(db)
135 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
136
137 communityRepo := postgresRepo.NewCommunityRepository(db)
138
139 // V2.0: PDS-managed DID generation
140 // Community DIDs and keys are generated entirely by the PDS
141 // No Coves-side DID generator needed (reserved for future V2.1 hybrid approach)
142
143 instanceDID := os.Getenv("INSTANCE_DID")
144 if instanceDID == "" {
145 instanceDID = "did:web:coves.social" // Default for development
146 }
147
148 // V2: Extract instance domain for community handles
149 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
150 // We cannot allow arbitrary domains to prevent impersonation attacks
151 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
152 //
153 // SECURITY: did:web domain verification is implemented in the Jetstream consumer
154 // See: internal/atproto/jetstream/community_consumer.go - verifyHostedByClaim()
155 // Communities with mismatched hostedBy domains are rejected during indexing
156 var instanceDomain string
157 if strings.HasPrefix(instanceDID, "did:web:") {
158 // Extract domain from did:web (this is the authoritative source)
159 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
160 } else {
161 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
162 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
163 if instanceDomain == "" {
164 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
165 }
166 }
167
168 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
169
170 // V2.0: Initialize PDS account provisioner for communities (simplified)
171 // PDS handles all DID and key generation - no Coves-side cryptography needed
172 provisioner := communities.NewPDSAccountProvisioner(instanceDomain, defaultPDS)
173 log.Printf("✅ Community provisioner initialized (PDS-managed keys)")
174 log.Printf(" - Communities will be created at: %s", defaultPDS)
175 log.Printf(" - PDS will generate and manage all DIDs and keys")
176
177 // Initialize community service (no longer needs didGenerator directly)
178 communityService := communities.NewCommunityService(communityRepo, defaultPDS, instanceDID, instanceDomain, provisioner)
179
180 // Authenticate Coves instance with PDS to enable community record writes
181 // The instance needs a PDS account to write community records it owns
182 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
183 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
184 if pdsHandle != "" && pdsPassword != "" {
185 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
186 accessToken, authErr := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
187 if authErr != nil {
188 log.Printf("Warning: Failed to authenticate with PDS: %v", authErr)
189 log.Println("Community creation will fail until PDS authentication is configured")
190 } else {
191 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
192 svc.SetPDSAccessToken(accessToken)
193 log.Println("✓ Coves instance authenticated with PDS")
194 }
195 }
196 } else {
197 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
198 log.Println("Community creation via write-forward is disabled")
199 }
200
201 // Start Jetstream consumer for read-forward user indexing
202 jetstreamURL := os.Getenv("JETSTREAM_URL")
203 if jetstreamURL == "" {
204 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
205 }
206
207 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
208
209 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
210 ctx := context.Background()
211 go func() {
212 if startErr := userConsumer.Start(ctx); startErr != nil {
213 log.Printf("Jetstream consumer stopped: %v", startErr)
214 }
215 }()
216
217 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
218
219 // Start Jetstream consumer for community events (profiles and subscriptions)
220 // This consumer indexes:
221 // 1. Community profiles (social.coves.community.profile) - in community's own repo
222 // 2. User subscriptions (social.coves.community.subscription) - in user's repo
223 communityJetstreamURL := os.Getenv("COMMUNITY_JETSTREAM_URL")
224 if communityJetstreamURL == "" {
225 // Local Jetstream for communities - filter to our instance's collections
226 // IMPORTANT: We listen to social.coves.community.subscription (not social.coves.community.subscribe)
227 // because subscriptions are RECORD TYPES in the communities namespace, not XRPC procedures
228 communityJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.community.profile&wantedCollections=social.coves.community.subscription"
229 }
230
231 // Initialize community event consumer with did:web verification
232 skipDIDWebVerification := os.Getenv("SKIP_DID_WEB_VERIFICATION") == "true"
233 if skipDIDWebVerification {
234 log.Println("⚠️ WARNING: did:web domain verification is DISABLED (dev mode)")
235 log.Println(" Set SKIP_DID_WEB_VERIFICATION=false for production")
236 }
237
238 communityEventConsumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, skipDIDWebVerification)
239 communityJetstreamConnector := jetstream.NewCommunityJetstreamConnector(communityEventConsumer, communityJetstreamURL)
240
241 go func() {
242 if startErr := communityJetstreamConnector.Start(ctx); startErr != nil {
243 log.Printf("Community Jetstream consumer stopped: %v", startErr)
244 }
245 }()
246
247 log.Printf("Started Jetstream community consumer: %s", communityJetstreamURL)
248 log.Println(" - Indexing: social.coves.community.profile (community profiles)")
249 log.Println(" - Indexing: social.coves.community.subscription (user subscriptions)")
250
251 // Start JWKS cache cleanup background job
252 go func() {
253 ticker := time.NewTicker(1 * time.Hour)
254 defer ticker.Stop()
255 for range ticker.C {
256 jwksFetcher.CleanupExpiredCache()
257 log.Println("JWKS cache cleanup completed")
258 }
259 }()
260
261 log.Println("Started JWKS cache cleanup background job (runs hourly)")
262
263 // Initialize post service
264 postRepo := postgresRepo.NewPostRepository(db)
265 postService := posts.NewPostService(postRepo, communityService, defaultPDS)
266
267 // Initialize feed service
268 feedRepo := postgresRepo.NewCommunityFeedRepository(db)
269 feedService := communityFeeds.NewCommunityFeedService(feedRepo, communityService)
270 log.Println("✅ Feed service initialized")
271
272 // Start Jetstream consumer for posts
273 // This consumer indexes posts created in community repositories via the firehose
274 // Currently handles only CREATE operations - UPDATE/DELETE deferred until those features exist
275 postJetstreamURL := os.Getenv("POST_JETSTREAM_URL")
276 if postJetstreamURL == "" {
277 // Listen to post record creation events
278 postJetstreamURL = "ws://localhost:6008/subscribe?wantedCollections=social.coves.post.record"
279 }
280
281 postEventConsumer := jetstream.NewPostEventConsumer(postRepo, communityRepo, userService)
282 postJetstreamConnector := jetstream.NewPostJetstreamConnector(postEventConsumer, postJetstreamURL)
283
284 go func() {
285 if startErr := postJetstreamConnector.Start(ctx); startErr != nil {
286 log.Printf("Post Jetstream consumer stopped: %v", startErr)
287 }
288 }()
289
290 log.Printf("Started Jetstream post consumer: %s", postJetstreamURL)
291 log.Println(" - Indexing: social.coves.post.record CREATE operations")
292 log.Println(" - UPDATE/DELETE indexing deferred until those features are implemented")
293
294 // Register XRPC routes
295 routes.RegisterUserRoutes(r, userService)
296 routes.RegisterCommunityRoutes(r, communityService, authMiddleware)
297 log.Println("Community XRPC endpoints registered with OAuth authentication")
298
299 routes.RegisterPostRoutes(r, postService, authMiddleware)
300 log.Println("Post XRPC endpoints registered with OAuth authentication")
301
302 routes.RegisterCommunityFeedRoutes(r, feedService)
303 log.Println("Feed XRPC endpoints registered (public, no auth required)")
304
305 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
306 w.WriteHeader(http.StatusOK)
307 if _, err := w.Write([]byte("OK")); err != nil {
308 log.Printf("Failed to write health check response: %v", err)
309 }
310 })
311
312 port := os.Getenv("APPVIEW_PORT")
313 if port == "" {
314 port = "8081" // Match .env.dev default
315 }
316
317 fmt.Printf("Coves AppView starting on port %s\n", port)
318 fmt.Printf("Default PDS: %s\n", defaultPDS)
319 log.Fatal(http.ListenAndServe(":"+port, r))
320}
321
322// authenticateWithPDS creates a session on the PDS and returns an access token
323func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
324 type CreateSessionRequest struct {
325 Identifier string `json:"identifier"`
326 Password string `json:"password"`
327 }
328
329 type CreateSessionResponse struct {
330 DID string `json:"did"`
331 Handle string `json:"handle"`
332 AccessJwt string `json:"accessJwt"`
333 }
334
335 reqBody, err := json.Marshal(CreateSessionRequest{
336 Identifier: handle,
337 Password: password,
338 })
339 if err != nil {
340 return "", fmt.Errorf("failed to marshal request: %w", err)
341 }
342
343 resp, err := http.Post(
344 pdsURL+"/xrpc/com.atproto.server.createSession",
345 "application/json",
346 bytes.NewReader(reqBody),
347 )
348 if err != nil {
349 return "", fmt.Errorf("failed to call PDS: %w", err)
350 }
351 defer func() {
352 if closeErr := resp.Body.Close(); closeErr != nil {
353 log.Printf("Failed to close response body: %v", closeErr)
354 }
355 }()
356
357 if resp.StatusCode != http.StatusOK {
358 body, readErr := io.ReadAll(resp.Body)
359 if readErr != nil {
360 return "", fmt.Errorf("PDS returned status %d and failed to read body: %w", resp.StatusCode, readErr)
361 }
362 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
363 }
364
365 var session CreateSessionResponse
366 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
367 return "", fmt.Errorf("failed to decode response: %w", err)
368 }
369
370 return session.AccessJwt, nil
371}