A community based topic aggregation platform built on atproto
1package main
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "encoding/json"
8 "fmt"
9 "io"
10 "log"
11 "net/http"
12 "os"
13 "time"
14
15 "github.com/go-chi/chi/v5"
16 chiMiddleware "github.com/go-chi/chi/v5/middleware"
17 _ "github.com/lib/pq"
18 "github.com/pressly/goose/v3"
19
20 "Coves/internal/api/handlers/oauth"
21 "Coves/internal/api/middleware"
22 "Coves/internal/api/routes"
23 "Coves/internal/atproto/did"
24 "Coves/internal/atproto/identity"
25 "Coves/internal/atproto/jetstream"
26 "Coves/internal/core/communities"
27 oauthCore "Coves/internal/core/oauth"
28 "Coves/internal/core/users"
29 postgresRepo "Coves/internal/db/postgres"
30)
31
32func main() {
33 // Database configuration (AppView database)
34 dbURL := os.Getenv("DATABASE_URL")
35 if dbURL == "" {
36 // Use dev database from .env.dev
37 dbURL = "postgres://dev_user:dev_password@localhost:5433/coves_dev?sslmode=disable"
38 }
39
40 // Default PDS URL for this Coves instance (supports self-hosting)
41 defaultPDS := os.Getenv("PDS_URL")
42 if defaultPDS == "" {
43 defaultPDS = "http://localhost:3001" // Local dev PDS
44 }
45
46 db, err := sql.Open("postgres", dbURL)
47 if err != nil {
48 log.Fatal("Failed to connect to database:", err)
49 }
50 defer db.Close()
51
52 if err := db.Ping(); err != nil {
53 log.Fatal("Failed to ping database:", err)
54 }
55
56 log.Println("Connected to AppView database")
57
58 // Run migrations
59 if err := goose.SetDialect("postgres"); err != nil {
60 log.Fatal("Failed to set goose dialect:", err)
61 }
62
63 if err := goose.Up(db, "internal/db/migrations"); err != nil {
64 log.Fatal("Failed to run migrations:", err)
65 }
66
67 log.Println("Migrations completed successfully")
68
69 r := chi.NewRouter()
70
71 r.Use(chiMiddleware.Logger)
72 r.Use(chiMiddleware.Recoverer)
73 r.Use(chiMiddleware.RequestID)
74
75 // Rate limiting: 100 requests per minute per IP
76 rateLimiter := middleware.NewRateLimiter(100, 1*time.Minute)
77 r.Use(rateLimiter.Middleware)
78
79 // Initialize identity resolver
80 identityConfig := identity.DefaultConfig()
81 // Override from environment if set
82 if plcURL := os.Getenv("IDENTITY_PLC_URL"); plcURL != "" {
83 identityConfig.PLCURL = plcURL
84 }
85 if cacheTTL := os.Getenv("IDENTITY_CACHE_TTL"); cacheTTL != "" {
86 if duration, err := time.ParseDuration(cacheTTL); err == nil {
87 identityConfig.CacheTTL = duration
88 }
89 }
90
91 identityResolver := identity.NewResolver(db, identityConfig)
92 log.Println("Identity resolver initialized with PLC:", identityConfig.PLCURL)
93
94 // Initialize OAuth session store
95 sessionStore := oauthCore.NewPostgresSessionStore(db)
96 log.Println("OAuth session store initialized")
97
98 // Initialize repositories and services
99 userRepo := postgresRepo.NewUserRepository(db)
100 userService := users.NewUserService(userRepo, identityResolver, defaultPDS)
101
102 communityRepo := postgresRepo.NewCommunityRepository(db)
103
104 // Initialize DID generator for communities
105 // IS_DEV_ENV=true: Generate did:plc:xxx without registering to PLC directory
106 // IS_DEV_ENV=false: Generate did:plc:xxx and register with PLC_DIRECTORY_URL
107 isDevEnv := os.Getenv("IS_DEV_ENV") == "true"
108 plcDirectoryURL := os.Getenv("PLC_DIRECTORY_URL")
109 if plcDirectoryURL == "" {
110 plcDirectoryURL = "https://plc.directory" // Default to Bluesky's PLC
111 }
112 didGenerator := did.NewGenerator(isDevEnv, plcDirectoryURL)
113 log.Printf("DID generator initialized (dev_mode=%v, plc_url=%s)", isDevEnv, plcDirectoryURL)
114
115 instanceDID := os.Getenv("INSTANCE_DID")
116 if instanceDID == "" {
117 instanceDID = "did:web:coves.local" // Default for development
118 }
119 communityService := communities.NewCommunityService(communityRepo, didGenerator, defaultPDS, instanceDID)
120
121 // Authenticate Coves instance with PDS to enable community record writes
122 // The instance needs a PDS account to write community records it owns
123 pdsHandle := os.Getenv("PDS_INSTANCE_HANDLE")
124 pdsPassword := os.Getenv("PDS_INSTANCE_PASSWORD")
125 if pdsHandle != "" && pdsPassword != "" {
126 log.Printf("Authenticating Coves instance (%s) with PDS...", instanceDID)
127 accessToken, err := authenticateWithPDS(defaultPDS, pdsHandle, pdsPassword)
128 if err != nil {
129 log.Printf("Warning: Failed to authenticate with PDS: %v", err)
130 log.Println("Community creation will fail until PDS authentication is configured")
131 } else {
132 if svc, ok := communityService.(interface{ SetPDSAccessToken(string) }); ok {
133 svc.SetPDSAccessToken(accessToken)
134 log.Println("✓ Coves instance authenticated with PDS")
135 }
136 }
137 } else {
138 log.Println("Note: PDS_INSTANCE_HANDLE and PDS_INSTANCE_PASSWORD not set")
139 log.Println("Community creation via write-forward is disabled")
140 }
141
142 // Start Jetstream consumer for read-forward user indexing
143 jetstreamURL := os.Getenv("JETSTREAM_URL")
144 if jetstreamURL == "" {
145 jetstreamURL = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.actor.profile"
146 }
147
148 pdsFilter := os.Getenv("JETSTREAM_PDS_FILTER") // Optional: filter to specific PDS
149
150 userConsumer := jetstream.NewUserEventConsumer(userService, identityResolver, jetstreamURL, pdsFilter)
151 ctx := context.Background()
152 go func() {
153 if err := userConsumer.Start(ctx); err != nil {
154 log.Printf("Jetstream consumer stopped: %v", err)
155 }
156 }()
157
158 log.Printf("Started Jetstream user consumer: %s", jetstreamURL)
159
160 // Note: Community indexing happens through the same Jetstream firehose
161 // The CommunityEventConsumer is used by handlers when processing community-related events
162 // For now, community records are created via write-forward to PDS, then indexed when
163 // they appear in the firehose. A dedicated consumer can be added later if needed.
164 log.Println("Community event consumer initialized (processes events from firehose)")
165
166 // Start OAuth cleanup background job
167 go func() {
168 ticker := time.NewTicker(1 * time.Hour)
169 defer ticker.Stop()
170 for range ticker.C {
171 if pgStore, ok := sessionStore.(*oauthCore.PostgresSessionStore); ok {
172 _ = pgStore.CleanupExpiredRequests(ctx)
173 _ = pgStore.CleanupExpiredSessions(ctx)
174 log.Println("OAuth cleanup completed")
175 }
176 }
177 }()
178
179 log.Println("Started OAuth cleanup background job (runs hourly)")
180
181 // Initialize OAuth cookie store (singleton)
182 cookieSecret, err := oauth.GetEnvBase64OrPlain("OAUTH_COOKIE_SECRET")
183 if err != nil {
184 log.Fatalf("Failed to load OAUTH_COOKIE_SECRET: %v", err)
185 }
186 if cookieSecret == "" {
187 log.Fatal("OAUTH_COOKIE_SECRET not configured")
188 }
189
190 if err := oauth.InitCookieStore(cookieSecret); err != nil {
191 log.Fatalf("Failed to initialize cookie store: %v", err)
192 }
193
194 // Initialize OAuth handlers
195 loginHandler := oauth.NewLoginHandler(identityResolver, sessionStore)
196 callbackHandler := oauth.NewCallbackHandler(sessionStore)
197 logoutHandler := oauth.NewLogoutHandler(sessionStore)
198
199 // OAuth routes (public endpoints)
200 r.Post("/oauth/login", loginHandler.HandleLogin)
201 r.Get("/oauth/callback", callbackHandler.HandleCallback)
202 r.Post("/oauth/logout", logoutHandler.HandleLogout)
203 r.Get("/oauth/client-metadata.json", oauth.HandleClientMetadata)
204 r.Get("/oauth/jwks.json", oauth.HandleJWKS)
205
206 log.Println("OAuth endpoints registered")
207
208 // Register XRPC routes
209 routes.RegisterUserRoutes(r, userService)
210 routes.RegisterCommunityRoutes(r, communityService)
211 log.Println("Community XRPC endpoints registered")
212
213 r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
214 w.WriteHeader(http.StatusOK)
215 w.Write([]byte("OK"))
216 })
217
218 port := os.Getenv("APPVIEW_PORT")
219 if port == "" {
220 port = "8081" // Match .env.dev default
221 }
222
223 fmt.Printf("Coves AppView starting on port %s\n", port)
224 fmt.Printf("Default PDS: %s\n", defaultPDS)
225 log.Fatal(http.ListenAndServe(":"+port, r))
226}
227
228// authenticateWithPDS creates a session on the PDS and returns an access token
229func authenticateWithPDS(pdsURL, handle, password string) (string, error) {
230 type CreateSessionRequest struct {
231 Identifier string `json:"identifier"`
232 Password string `json:"password"`
233 }
234
235 type CreateSessionResponse struct {
236 DID string `json:"did"`
237 Handle string `json:"handle"`
238 AccessJwt string `json:"accessJwt"`
239 }
240
241 reqBody, err := json.Marshal(CreateSessionRequest{
242 Identifier: handle,
243 Password: password,
244 })
245 if err != nil {
246 return "", fmt.Errorf("failed to marshal request: %w", err)
247 }
248
249 resp, err := http.Post(
250 pdsURL+"/xrpc/com.atproto.server.createSession",
251 "application/json",
252 bytes.NewReader(reqBody),
253 )
254 if err != nil {
255 return "", fmt.Errorf("failed to call PDS: %w", err)
256 }
257 defer resp.Body.Close()
258
259 if resp.StatusCode != http.StatusOK {
260 body, _ := io.ReadAll(resp.Body)
261 return "", fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body))
262 }
263
264 var session CreateSessionResponse
265 if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
266 return "", fmt.Errorf("failed to decode response: %w", err)
267 }
268
269 return session.AccessJwt, nil
270}