A community based topic aggregation platform built on atproto
1package main 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "net/http" 9 "os" 10 "time" 11 12 "github.com/go-chi/chi/v5" 13 chiMiddleware "github.com/go-chi/chi/v5/middleware" 14 _ "github.com/lib/pq" 15 "github.com/pressly/goose/v3" 16 17 "Coves/internal/api/handlers/oauth" 18 "Coves/internal/api/middleware" 19 "Coves/internal/api/routes" 20 "Coves/internal/atproto/identity" 21 "Coves/internal/atproto/jetstream" 22 oauthCore "Coves/internal/core/oauth" 23 "Coves/internal/core/users" 24 postgresRepo "Coves/internal/db/postgres" 25) 26 27func main() { 28 // Database configuration (AppView database) 29 dbURL := os.Getenv("DATABASE_URL") 30 if dbURL == "" { 31 // Use dev database from .env.dev 32 dbURL = "postgres://dev_user:dev_password@localhost:5433/coves_dev?sslmode=disable" 33 } 34 35 // Default PDS URL for this Coves instance (supports self-hosting) 36 defaultPDS := os.Getenv("PDS_URL") 37 if defaultPDS == "" { 38 defaultPDS = "http://localhost:3001" // Local dev PDS 39 } 40 41 db, err := sql.Open("postgres", dbURL) 42 if err != nil { 43 log.Fatal("Failed to connect to database:", err) 44 } 45 defer db.Close() 46 47 if err := db.Ping(); err != nil { 48 log.Fatal("Failed to ping database:", err) 49 } 50 51 log.Println("Connected to AppView database") 52 53 // Run migrations 54 if err := goose.SetDialect("postgres"); err != nil { 55 log.Fatal("Failed to set goose dialect:", err) 56 } 57 58 if err := goose.Up(db, "internal/db/migrations"); err != nil { 59 log.Fatal("Failed to run migrations:", err) 60 } 61 62 log.Println("Migrations completed successfully") 63 64 r := chi.NewRouter() 65 66 r.Use(chiMiddleware.Logger) 67 r.Use(chiMiddleware.Recoverer) 68 r.Use(chiMiddleware.RequestID) 69 70 // Rate limiting: 100 requests per minute per IP 71 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute) 72 r.Use(rateLimiter.Middleware) 73 74 // Initialize identity resolver 75 identityConfig := identity.DefaultConfig() 76 // Override from environment if set 77 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" { 78 identityConfig.PLCURL = plcURL 79 } 80 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" { 81 if duration, err := time.ParseDuration(cacheTTL); err == nil { 82 identityConfig.CacheTTL = duration 83 } 84 } 85 86 identityResolver := identity.NewResolver(db, identityConfig) 87 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL) 88 89 // Initialize OAuth session store 90 sessionStore := oauthCore.NewPostgresSessionStore(db) 91 log.Println("OAuth session store initialized") 92 93 // Initialize repositories and services 94 userRepo := postgresRepo.NewUserRepository(db) 95 userService := users.NewUserService(userRepo, identityResolver, defaultPDS) 96 97 // Start Jetstream consumer for read-forward user indexing 98 jetstreamURL := os.Getenv("JETSTREAM_URL") 99 if jetstreamURL == "" { 100 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile" 101 } 102 103 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS 104 105 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter) 106 ctx := context.Background() 107 go func() { 108 if err := userConsumer.Start(ctx); err != nil { 109 log.Printf("Jetstream consumer stopped: %v", err) 110 } 111 }() 112 113 log.Printf("Started Jetstream consumer: %s", jetstreamURL) 114 115 // Start OAuth cleanup background job 116 go func() { 117 ticker := time.NewTicker(1 * time.Hour) 118 defer ticker.Stop() 119 for range ticker.C { 120 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok { 121 _ = pgStore.CleanupExpiredRequests(ctx) 122 _ = pgStore.CleanupExpiredSessions(ctx) 123 log.Println("OAuth cleanup completed") 124 } 125 } 126 }() 127 128 log.Println("Started OAuth cleanup background job (runs hourly)") 129 130 // Initialize OAuth cookie store (singleton) 131 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET") 132 if err != nil { 133 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err) 134 } 135 if cookieSecret == "" { 136 log.Fatal("OAUTH_COOKIE_SECRET not configured") 137 } 138 139 if err := oauth.InitCookieStore(cookieSecret); err != nil { 140 log.Fatalf("Failed to initialize cookie store: %v", err) 141 } 142 143 // Initialize OAuth handlers 144 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore) 145 callbackHandler := oauth.NewCallbackHandler(sessionStore) 146 logoutHandler := oauth.NewLogoutHandler(sessionStore) 147 148 // OAuth routes (public endpoints) 149 r.Post("/oauth/login", loginHandler.HandleLogin) 150 r.Get("/oauth/callback", callbackHandler.HandleCallback) 151 r.Post("/oauth/logout", logoutHandler.HandleLogout) 152 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata) 153 r.Get("/oauth/jwks.json", oauth.HandleJWKS) 154 155 log.Println("OAuth endpoints registered") 156 157 // Register XRPC routes 158 routes.RegisterUserRoutes(r, userService) 159 160 r.Get("/health", func(w http.ResponseWriter, r *http.Request) { 161 w.WriteHeader(http.StatusOK) 162 w.Write([]byte("OK")) 163 }) 164 165 port := os.Getenv("APPVIEW_PORT") 166 if port == "" { 167 port = "8081" // Match .env.dev default 168 } 169 170 fmt.Printf("Coves AppView starting on port %s\n", port) 171 fmt.Printf("Default PDS: %s\n", defaultPDS) 172 log.Fatal(http.ListenAndServe(":"+port, r)) 173}