A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "net/http"
9 "os"
10 "time"
11
12 "github.com/go-chi/chi/v5"
13 chiMiddleware "github.com/go-chi/chi/v5/middleware"
14 _ "github.com/lib/pq"
15 "github.com/pressly/goose/v3"
16
17 "Coves/internal/api/handlers/oauth"
18 "Coves/internal/api/middleware"
19 "Coves/internal/api/routes"
20 "Coves/internal/atproto/identity"
21 "Coves/internal/atproto/jetstream"
22 oauthCore "Coves/internal/core/oauth"
23 "Coves/internal/core/users"
24 postgresRepo "Coves/internal/db/postgres"
25)
26
27func main() {
28 // Database configuration (AppView database)
29 dbURL := os.Getenv("DATABASE_URL")
30 if dbURL == "" {
31 // Use dev database from .env.dev
32 dbURL = "postgres://dev_user:dev_password@localhost:5433/coves_dev?sslmode=disable"
33 }
34
35 // Default PDS URL for this Coves instance (supports self-hosting)
36 defaultPDS := os.Getenv("PDS_URL")
37 if defaultPDS == "" {
38 defaultPDS = "http://localhost:3001" // Local dev PDS
39 }
40
41 db, err := sql.Open("postgres", dbURL)
42 if err != nil {
43 log.Fatal("Failed to connect to database:", err)
44 }
45 defer db.Close()
46
47 if err := db.Ping(); err != nil {
48 log.Fatal("Failed to ping database:", err)
49 }
50
51 log.Println("Connected to AppView database")
52
53 // Run migrations
54 if err := goose.SetDialect("postgres"); err != nil {
55 log.Fatal("Failed to set goose dialect:", err)
56 }
57
58 if err := goose.Up(db, "internal/db/migrations"); err != nil {
59 log.Fatal("Failed to run migrations:", err)
60 }
61
62 log.Println("Migrations completed successfully")
63
64 r := chi.NewRouter()
65
66 r.Use(chiMiddleware.Logger)
67 r.Use(chiMiddleware.Recoverer)
68 r.Use(chiMiddleware.RequestID)
69
70 // Rate limiting: 100 requests per minute per IP
71 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
72 r.Use(rateLimiter.Middleware)
73
74 // Initialize identity resolver
75 identityConfig := identity.DefaultConfig()
76 // Override from environment if set
77 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" {
78 identityConfig.PLCURL = plcURL
79 }
80 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
81 if duration, err := time.ParseDuration(cacheTTL); err == nil {
82 identityConfig.CacheTTL = duration
83 }
84 }
85
86 identityResolver := identity.NewResolver(db, identityConfig)
87 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL)
88
89 // Initialize OAuth session store
90 sessionStore := oauthCore.NewPostgresSessionStore(db)
91 log.Println("OAuth session store initialized")
92
93 // Initialize repositories and services
94 userRepo := postgresRepo.NewUserRepository(db)
95 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
96
97 // Start Jetstream consumer for read-forward user indexing
98 jetstreamURL := os.Getenv("JETSTREAM_URL")
99 if jetstreamURL == "" {
100 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
101 }
102
103 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
104
105 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
106 ctx := context.Background()
107 go func() {
108 if err := userConsumer.Start(ctx); err != nil {
109 log.Printf("Jetstream consumer stopped: %v", err)
110 }
111 }()
112
113 log.Printf("Started Jetstream consumer: %s", jetstreamURL)
114
115 // Start OAuth cleanup background job
116 go func() {
117 ticker := time.NewTicker(1 * time.Hour)
118 defer ticker.Stop()
119 for range ticker.C {
120 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok {
121 _ = pgStore.CleanupExpiredRequests(ctx)
122 _ = pgStore.CleanupExpiredSessions(ctx)
123 log.Println("OAuth cleanup completed")
124 }
125 }
126 }()
127
128 log.Println("Started OAuth cleanup background job (runs hourly)")
129
130 // Initialize OAuth cookie store (singleton)
131 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET")
132 if err != nil {
133 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err)
134 }
135 if cookieSecret == "" {
136 log.Fatal("OAUTH_COOKIE_SECRET not configured")
137 }
138
139 if err := oauth.InitCookieStore(cookieSecret); err != nil {
140 log.Fatalf("Failed to initialize cookie store: %v", err)
141 }
142
143 // Initialize OAuth handlers
144 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore)
145 callbackHandler := oauth.NewCallbackHandler(sessionStore)
146 logoutHandler := oauth.NewLogoutHandler(sessionStore)
147
148 // OAuth routes (public endpoints)
149 r.Post("/oauth/login", loginHandler.HandleLogin)
150 r.Get("/oauth/callback", callbackHandler.HandleCallback)
151 r.Post("/oauth/logout", logoutHandler.HandleLogout)
152 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata)
153 r.Get("/oauth/jwks.json", oauth.HandleJWKS)
154
155 log.Println("OAuth endpoints registered")
156
157 // Register XRPC routes
158 routes.RegisterUserRoutes(r, userService)
159
160 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
161 w.WriteHeader(http.StatusOK)
162 w.Write([]byte("OK"))
163 })
164
165 port := os.Getenv("APPVIEW_PORT")
166 if port == "" {
167 port = "8081" // Match .env.dev default
168 }
169
170 fmt.Printf("Coves AppView starting on port %s\n", port)
171 fmt.Printf("Default PDS: %s\n", defaultPDS)
172 log.Fatal(http.ListenAndServe(":"+port, r))
173}