A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "encoding/json" 8 "fmt" 9 "io" 10 "log" 11 "net/http" 12 "os" 13 "time" 14 15 "github.com/go-chi/chi/v5" 16 chiMiddleware "github.com/go-chi/chi/v5/middleware" 17 _ "github.com/lib/pq" 18 "github.com/pressly/goose/v3" 19 20 "Coves/internal/api/handlers/oauth" 21 "Coves/internal/api/middleware" 22 "Coves/internal/api/routes" 23 "Coves/internal/atproto/did" 24 "Coves/internal/atproto/identity" 25 "Coves/internal/atproto/jetstream" 26 "Coves/internal/core/communities" 27 oauthCore "Coves/internal/core/oauth" 28 "Coves/internal/core/users" 29 postgresRepo "Coves/internal/db/postgres" 30) 31 32func main() { 33 // Database configuration (AppView database) 34 dbURL := os.Getenv("DATABASE_URL") 35 if dbURL == "" { 36 // Use dev database from .env.dev 37 dbURL = "postgres://dev_user:dev_password@localhost:5433/coves_dev?sslmode=disable" 38 } 39 40 // Default PDS URL for this Coves instance (supports self-hosting) 41 defaultPDS := os.Getenv("PDS_URL") 42 if defaultPDS == "" { 43 defaultPDS = "http://localhost:3001" // Local dev PDS 44 } 45 46 db, err := sql.Open("postgres", dbURL) 47 if err != nil { 48 log.Fatal("Failed to connect to database:", err) 49 } 50 defer db.Close() 51 52 if err := db.Ping(); err != nil { 53 log.Fatal("Failed to ping database:", err) 54 } 55 56 log.Println("Connected to AppView database") 57 58 // Run migrations 59 if err := goose.SetDialect("postgres"); err != nil { 60 log.Fatal("Failed to set goose dialect:", err) 61 } 62 63 if err := goose.Up(db, "internal/db/migrations"); err != nil { 64 log.Fatal("Failed to run migrations:", err) 65 } 66 67 log.Println("Migrations completed successfully") 68 69 r := chi.NewRouter() 70 71 r.Use(chiMiddleware.Logger) 72 r.Use(chiMiddleware.Recoverer) 73 r.Use(chiMiddleware.RequestID) 74 75 // Rate limiting: 100 requests per minute per IP 76 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 77 r.Use(rateLimiter.Middleware) 78 79 // Initialize identity resolver 80 identityConfig := identity.DefaultConfig() 81 // Override from environment if set 82 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" { 83 identityConfig.PLCURL = plcURL 84 } 85 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 86 if duration, err := time.ParseDuration(cacheTTL); err == nil { 87 identityConfig.CacheTTL = duration 88 } 89 } 90 91 identityResolver := identity.NewResolver(db, identityConfig) 92 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL) 93 94 // Initialize OAuth session store 95 sessionStore := oauthCore.NewPostgresSessionStore(db) 96 log.Println("OAuth session store initialized") 97 98 // Initialize repositories and services 99 userRepo := postgresRepo.NewUserRepository(db) 100 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 101 102 communityRepo := postgresRepo.NewCommunityRepository(db) 103 104 // Initialize DID generator for communities 105 // IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory 106 // IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL 107 isDevEnv := os.Getenv("IS_DEV_ENV") == "true" 108 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL") 109 if plcDirectoryURL == "" { 110 plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC 111 } 112 didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL) 113 log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL) 114 115 instanceDID := os.Getenv("INSTANCE_DID") 116 if instanceDID == "" { 117 instanceDID = "did:web:coves.local" // Default for development 118 } 119 communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID) 120 121 // Authenticate Coves instance with PDS to enable community record writes 122 // The instance needs a PDS account to write community records it owns 123 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE") 124 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD") 125 if pdsHandle != "" && pdsPassword != "" { 126 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID) 127 accessToken, err := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword) 128 if err != nil { 129 log.Printf("Warning: Failed to authenticate with PDS: %v", err) 130 log.Println("Community creation will fail until PDS authentication is configured") 131 } else { 132 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok { 133 svc.SetPDSAccessToken(accessToken) 134 log.Println("✓ Coves instance authenticated with PDS") 135 } 136 } 137 } else { 138 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set") 139 log.Println("Community creation via write-forward is disabled") 140 } 141 142 // Start Jetstream consumer for read-forward user indexing 143 jetstreamURL := os.Getenv("JETSTREAM_URL") 144 if jetstreamURL == "" { 145 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 146 } 147 148 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 149 150 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 151 ctx := context.Background() 152 go func() { 153 if err := userConsumer.Start(ctx); err != nil { 154 log.Printf("Jetstream consumer stopped: %v", err) 155 } 156 }() 157 158 log.Printf("Started Jetstream user consumer: %s", jetstreamURL) 159 160 // Note: Community indexing happens through the same Jetstream firehose 161 // The CommunityEventConsumer is used by handlers when processing community-related events 162 // For now, community records are created via write-forward to PDS, then indexed when 163 // they appear in the firehose. A dedicated consumer can be added later if needed. 164 log.Println("Community event consumer initialized (processes events from firehose)") 165 166 // Start OAuth cleanup background job 167 go func() { 168 ticker := time.NewTicker(1 * time.Hour) 169 defer ticker.Stop() 170 for range ticker.C { 171 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok { 172 _ = pgStore.CleanupExpiredRequests(ctx) 173 _ = pgStore.CleanupExpiredSessions(ctx) 174 log.Println("OAuth cleanup completed") 175 } 176 } 177 }() 178 179 log.Println("Started OAuth cleanup background job (runs hourly)") 180 181 // Initialize OAuth cookie store (singleton) 182 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET") 183 if err != nil { 184 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err) 185 } 186 if cookieSecret == "" { 187 log.Fatal("OAUTH_COOKIE_SECRET not configured") 188 } 189 190 if err := oauth.InitCookieStore(cookieSecret); err != nil { 191 log.Fatalf("Failed to initialize cookie store: %v", err) 192 } 193 194 // Initialize OAuth handlers 195 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore) 196 callbackHandler := oauth.NewCallbackHandler(sessionStore) 197 logoutHandler := oauth.NewLogoutHandler(sessionStore) 198 199 // OAuth routes (public endpoints) 200 r.Post("/oauth/login", loginHandler.HandleLogin) 201 r.Get("/oauth/callback", callbackHandler.HandleCallback) 202 r.Post("/oauth/logout", logoutHandler.HandleLogout) 203 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata) 204 r.Get("/oauth/jwks.json", oauth.HandleJWKS) 205 206 log.Println("OAuth endpoints registered") 207 208 // Register XRPC routes 209 routes.RegisterUserRoutes(r, userService) 210 routes.RegisterCommunityRoutes(r, communityService) 211 log.Println("Community XRPC endpoints registered") 212 213 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 214 w.WriteHeader(http.StatusOK) 215 w.Write([]byte("OK")) 216 }) 217 218 port := os.Getenv("APPVIEW_PORT") 219 if port == "" { 220 port = "8081" // Match .env.dev default 221 } 222 223 fmt.Printf("Coves AppView starting on port %s\n", port) 224 fmt.Printf("Default PDS: %s\n", defaultPDS) 225 log.Fatal(http.ListenAndServe(":"+port, r)) 226} 227 228// authenticateWithPDS creates a session on the PDS and returns an access token 229func authenticateWithPDS(pdsURL, handle, password string) (string, error) { 230 type CreateSessionRequest struct { 231 Identifier string `json:"identifier"` 232 Password string `json:"password"` 233 } 234 235 type CreateSessionResponse struct { 236 DID string `json:"did"` 237 Handle string `json:"handle"` 238 AccessJwt string `json:"accessJwt"` 239 } 240 241 reqBody, err := json.Marshal(CreateSessionRequest{ 242 Identifier: handle, 243 Password: password, 244 }) 245 if err != nil { 246 return "", fmt.Errorf("failed to marshal request: %w", err) 247 } 248 249 resp, err := http.Post( 250 pdsURL+"/xrpc/com.atproto.server.createSession", 251 "application/json", 252 bytes.NewReader(reqBody), 253 ) 254 if err != nil { 255 return "", fmt.Errorf("failed to call PDS: %w", err) 256 } 257 defer resp.Body.Close() 258 259 if resp.StatusCode != http.StatusOK { 260 body, _ := io.ReadAll(resp.Body) 261 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 262 } 263 264 var session CreateSessionResponse 265 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 266 return "", fmt.Errorf("failed to decode response: %w", err) 267 } 268 269 return session.AccessJwt, nil 270}