A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log"
11 "net/http"
12 "os"
13 "strings"
14 "time"
15
16 "github.com/go-chi/chi/v5"
17 chiMiddleware "github.com/go-chi/chi/v5/middleware"
18 _ "github.com/lib/pq"
19 "github.com/pressly/goose/v3"
20
21 "Coves/internal/api/handlers/oauth"
22 "Coves/internal/api/middleware"
23 "Coves/internal/api/routes"
24 "Coves/internal/atproto/did"
25 "Coves/internal/atproto/identity"
26 "Coves/internal/atproto/jetstream"
27 "Coves/internal/core/communities"
28 oauthCore "Coves/internal/core/oauth"
29 "Coves/internal/core/users"
30 postgresRepo "Coves/internal/db/postgres"
31)
32
33func main() {
34 // Database configuration (AppView database)
35 dbURL := os.Getenv("DATABASE_URL")
36 if dbURL == "" {
37 // Use dev database from .env.dev
38 dbURL = "postgres://dev_user:dev_password@localhost:5435/coves_dev?sslmode=disable"
39 }
40
41 // Default PDS URL for this Coves instance (supports self-hosting)
42 defaultPDS := os.Getenv("PDS_URL")
43 if defaultPDS == "" {
44 defaultPDS = "http://localhost:3001" // Local dev PDS
45 }
46
47 db, err := sql.Open("postgres", dbURL)
48 if err != nil {
49 log.Fatal("Failed to connect to database:", err)
50 }
51 defer db.Close()
52
53 if err := db.Ping(); err != nil {
54 log.Fatal("Failed to ping database:", err)
55 }
56
57 log.Println("Connected to AppView database")
58
59 // Run migrations
60 if err := goose.SetDialect("postgres"); err != nil {
61 log.Fatal("Failed to set goose dialect:", err)
62 }
63
64 if err := goose.Up(db, "internal/db/migrations"); err != nil {
65 log.Fatal("Failed to run migrations:", err)
66 }
67
68 log.Println("Migrations completed successfully")
69
70 r := chi.NewRouter()
71
72 r.Use(chiMiddleware.Logger)
73 r.Use(chiMiddleware.Recoverer)
74 r.Use(chiMiddleware.RequestID)
75
76 // Rate limiting: 100 requests per minute per IP
77 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
78 r.Use(rateLimiter.Middleware)
79
80 // Initialize identity resolver
81 identityConfig := identity.DefaultConfig()
82 // Override from environment if set
83 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" {
84 identityConfig.PLCURL = plcURL
85 }
86 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
87 if duration, err := time.ParseDuration(cacheTTL); err == nil {
88 identityConfig.CacheTTL = duration
89 }
90 }
91
92 identityResolver := identity.NewResolver(db, identityConfig)
93 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL)
94
95 // Initialize OAuth session store
96 sessionStore := oauthCore.NewPostgresSessionStore(db)
97 log.Println("OAuth session store initialized")
98
99 // Initialize repositories and services
100 userRepo := postgresRepo.NewUserRepository(db)
101 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
102
103 communityRepo := postgresRepo.NewCommunityRepository(db)
104
105 // Initialize DID generator for communities
106 // IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory
107 // IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL
108 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
109 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
110 if plcDirectoryURL == "" {
111 plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC
112 }
113 didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL)
114 log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL)
115
116 instanceDID := os.Getenv("INSTANCE_DID")
117 if instanceDID == "" {
118 instanceDID = "did:web:coves.social" // Default for development
119 }
120
121 // V2: Extract instance domain for community handles
122 // IMPORTANT: This MUST match the domain in INSTANCE_DID for security
123 // We cannot allow arbitrary domains to prevent impersonation attacks
124 // Example attack: !leagueoflegends@riotgames.com on a non-Riot instance
125 //
126 // TODO (Security - V2.1): Implement did:web domain verification
127 // Currently, any self-hoster can set INSTANCE_DID=did:web:nintendo.com without
128 // actually owning nintendo.com. This allows domain impersonation attacks.
129 // Solution: Verify domain ownership by fetching https://domain/.well-known/did.json
130 // and ensuring it matches the claimed DID. See: https://atproto.com/specs/did-web
131 // Alternatively, switch to did:plc for instance DIDs (cryptographically unique).
132 var instanceDomain string
133 if strings.HasPrefix(instanceDID, "did:web:") {
134 // Extract domain from did:web (this is the authoritative source)
135 instanceDomain = strings.TrimPrefix(instanceDID, "did:web:")
136 } else {
137 // For non-web DIDs (e.g., did:plc), require explicit INSTANCE_DOMAIN
138 instanceDomain = os.Getenv("INSTANCE_DOMAIN")
139 if instanceDomain == "" {
140 log.Fatal("INSTANCE_DOMAIN must be set for non-web DIDs")
141 }
142 }
143
144 log.Printf("Instance domain: %s (extracted from DID: %s)", instanceDomain, instanceDID)
145
146 // V2: Initialize PDS account provisioner for communities
147 provisioner := communities.NewPDSAccountProvisioner(userService, instanceDomain, defaultPDS)
148
149 communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID, instanceDomain, provisioner)
150
151 // Authenticate Coves instance with PDS to enable community record writes
152 // The instance needs a PDS account to write community records it owns
153 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
154 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
155 if pdsHandle != "" && pdsPassword != "" {
156 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
157 accessToken, err := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
158 if err != nil {
159 log.Printf("Warning: Failed to authenticate with PDS: %v", err)
160 log.Println("Community creation will fail until PDS authentication is configured")
161 } else {
162 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
163 svc.SetPDSAccessToken(accessToken)
164 log.Println("✓ Coves instance authenticated with PDS")
165 }
166 }
167 } else {
168 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
169 log.Println("Community creation via write-forward is disabled")
170 }
171
172 // Start Jetstream consumer for read-forward user indexing
173 jetstreamURL := os.Getenv("JETSTREAM_URL")
174 if jetstreamURL == "" {
175 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
176 }
177
178 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
179
180 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
181 ctx := context.Background()
182 go func() {
183 if err := userConsumer.Start(ctx); err != nil {
184 log.Printf("Jetstream consumer stopped: %v", err)
185 }
186 }()
187
188 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
189
190 // Note: Community indexing happens through the same Jetstream firehose
191 // The CommunityEventConsumer is used by handlers when processing community-related events
192 // For now, community records are created via write-forward to PDS, then indexed when
193 // they appear in the firehose. A dedicated consumer can be added later if needed.
194 log.Println("Community event consumer initialized (processes events from firehose)")
195
196 // Start OAuth cleanup background job
197 go func() {
198 ticker := time.NewTicker(1 * time.Hour)
199 defer ticker.Stop()
200 for range ticker.C {
201 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok {
202 _ = pgStore.CleanupExpiredRequests(ctx)
203 _ = pgStore.CleanupExpiredSessions(ctx)
204 log.Println("OAuth cleanup completed")
205 }
206 }
207 }()
208
209 log.Println("Started OAuth cleanup background job (runs hourly)")
210
211 // Initialize OAuth cookie store (singleton)
212 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET")
213 if err != nil {
214 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err)
215 }
216 if cookieSecret == "" {
217 log.Fatal("OAUTH_COOKIE_SECRET not configured")
218 }
219
220 if err := oauth.InitCookieStore(cookieSecret); err != nil {
221 log.Fatalf("Failed to initialize cookie store: %v", err)
222 }
223
224 // Initialize OAuth handlers
225 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore)
226 callbackHandler := oauth.NewCallbackHandler(sessionStore)
227 logoutHandler := oauth.NewLogoutHandler(sessionStore)
228
229 // OAuth routes (public endpoints)
230 r.Post("/oauth/login", loginHandler.HandleLogin)
231 r.Get("/oauth/callback", callbackHandler.HandleCallback)
232 r.Post("/oauth/logout", logoutHandler.HandleLogout)
233 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata)
234 r.Get("/oauth/jwks.json", oauth.HandleJWKS)
235
236 log.Println("OAuth endpoints registered")
237
238 // Register XRPC routes
239 routes.RegisterUserRoutes(r, userService)
240 routes.RegisterCommunityRoutes(r, communityService)
241 log.Println("Community XRPC endpoints registered")
242
243 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
244 w.WriteHeader(http.StatusOK)
245 w.Write([]byte("OK"))
246 })
247
248 port := os.Getenv("APPVIEW_PORT")
249 if port == "" {
250 port = "8081" // Match .env.dev default
251 }
252
253 fmt.Printf("Coves AppView starting on port %s\n", port)
254 fmt.Printf("Default PDS: %s\n", defaultPDS)
255 log.Fatal(http.ListenAndServe(":"+port, r))
256}
257
258// authenticateWithPDS creates a session on the PDS and returns an access token
259func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
260 type CreateSessionRequest struct {
261 Identifier string `json:"identifier"`
262 Password string `json:"password"`
263 }
264
265 type CreateSessionResponse struct {
266 DID string `json:"did"`
267 Handle string `json:"handle"`
268 AccessJwt string `json:"accessJwt"`
269 }
270
271 reqBody, err := json.Marshal(CreateSessionRequest{
272 Identifier: handle,
273 Password: password,
274 })
275 if err != nil {
276 return "", fmt.Errorf("failed to marshal request: %w", err)
277 }
278
279 resp, err := http.Post(
280 pdsURL+"/xrpc/com.atproto.server.createSession",
281 "application/json",
282 bytes.NewReader(reqBody),
283 )
284 if err != nil {
285 return "", fmt.Errorf("failed to call PDS: %w", err)
286 }
287 defer resp.Body.Close()
288
289 if resp.StatusCode != http.StatusOK {
290 body, _ := io.ReadAll(resp.Body)
291 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
292 }
293
294 var session CreateSessionResponse
295 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
296 return "", fmt.Errorf("failed to decode response: %w", err)
297 }
298
299 return session.AccessJwt, nil
300}