A community based topic aggregation platform built on atproto
1package posts
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log"
10 "net/http"
11 "os"
12 "time"
13
14 "Coves/internal/api/middleware"
15 "Coves/internal/core/aggregators"
16 "Coves/internal/core/blobs"
17 "Coves/internal/core/communities"
18 "Coves/internal/core/unfurl"
19)
20
21type postService struct {
22 repo Repository
23 communityService communities.Service
24 aggregatorService aggregators.Service
25 blobService blobs.Service
26 unfurlService unfurl.Service
27 pdsURL string
28}
29
30// NewPostService creates a new post service
31// aggregatorService, blobService, and unfurlService can be nil if not needed (e.g., in tests or minimal setups)
32func NewPostService(
33 repo Repository,
34 communityService communities.Service,
35 aggregatorService aggregators.Service, // Optional: can be nil
36 blobService blobs.Service, // Optional: can be nil
37 unfurlService unfurl.Service, // Optional: can be nil
38 pdsURL string,
39) Service {
40 return &postService{
41 repo: repo,
42 communityService: communityService,
43 aggregatorService: aggregatorService,
44 blobService: blobService,
45 unfurlService: unfurlService,
46 pdsURL: pdsURL,
47 }
48}
49
50// CreatePost creates a new post in a community
51// Flow:
52// 1. Validate input
53// 2. Check if author is an aggregator (server-side validation using DID from JWT)
54// 3. If aggregator: validate authorization and rate limits, skip membership checks
55// 4. If user: resolve community and perform membership/ban validation
56// 5. Build post record
57// 6. Write to community's PDS repository
58// 7. If aggregator: record post for rate limiting
59// 8. Return URI/CID (AppView indexes asynchronously via Jetstream)
60func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
61 // 1. Validate basic input (before DID checks to give clear validation errors)
62 if err := s.validateCreateRequest(req); err != nil {
63 return nil, err
64 }
65
66 // 2. SECURITY: Extract authenticated DID from context (set by JWT middleware)
67 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed
68 authenticatedDID := middleware.GetAuthenticatedDID(ctx)
69 if authenticatedDID == "" {
70 return nil, fmt.Errorf("no authenticated DID in context - authentication required")
71 }
72
73 // SECURITY: Verify request DID matches authenticated DID from JWT
74 // This prevents DID spoofing where a malicious client or compromised handler
75 // could provide a different DID than what was authenticated
76 if authenticatedDID != req.AuthorDID {
77 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID)
78 return nil, fmt.Errorf("authenticated DID does not match author DID")
79 }
80
81 // 3. Determine actor type: Kagi aggregator, other aggregator, or regular user
82 kagiAggregatorDID := os.Getenv("KAGI_AGGREGATOR_DID")
83 isTrustedKagi := kagiAggregatorDID != "" && req.AuthorDID == kagiAggregatorDID
84
85 // Check if this is a non-Kagi aggregator (requires database lookup)
86 var isOtherAggregator bool
87 var err error
88 if !isTrustedKagi && s.aggregatorService != nil {
89 isOtherAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID)
90 if err != nil {
91 log.Printf("[POST-CREATE] Warning: failed to check if DID is aggregator: %v", err)
92 // Don't fail the request - treat as regular user if check fails
93 isOtherAggregator = false
94 }
95 }
96
97 // 4. Resolve community at-identifier (handle or DID) to DID
98 // This accepts both formats per atProto best practices:
99 // - Handles: !gardening.communities.coves.social
100 // - DIDs: did:plc:abc123 or did:web:coves.social
101 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community)
102 if err != nil {
103 // Handle specific error types appropriately
104 if communities.IsNotFound(err) {
105 return nil, ErrCommunityNotFound
106 }
107 if communities.IsValidationError(err) {
108 // Pass through validation errors (invalid format, etc.)
109 return nil, NewValidationError("community", err.Error())
110 }
111 // Infrastructure failures (DB errors, network issues) should be internal errors
112 // Don't leak internal details to client (e.g., "pq: connection refused")
113 return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
114 }
115
116 // 5. AUTHORIZATION: For non-Kagi aggregators, validate authorization and rate limits
117 // Kagi is exempted from database checks via env var (temporary until XRPC endpoint is ready)
118 if isOtherAggregator && s.aggregatorService != nil {
119 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil {
120 log.Printf("[POST-CREATE] Aggregator authorization failed: %s -> %s: %v", req.AuthorDID, communityDID, err)
121 return nil, fmt.Errorf("aggregator not authorized: %w", err)
122 }
123 log.Printf("[POST-CREATE] Aggregator authorized: %s -> %s", req.AuthorDID, communityDID)
124 }
125
126 // 6. Fetch community from AppView (includes all metadata)
127 community, err := s.communityService.GetByDID(ctx, communityDID)
128 if err != nil {
129 if communities.IsNotFound(err) {
130 return nil, ErrCommunityNotFound
131 }
132 return nil, fmt.Errorf("failed to fetch community: %w", err)
133 }
134
135 // 7. Apply validation based on actor type (aggregator vs user)
136 if isTrustedKagi {
137 // TRUSTED AGGREGATOR VALIDATION FLOW
138 // Kagi aggregator is authorized via KAGI_AGGREGATOR_DID env var (temporary)
139 // TODO: Replace with proper XRPC aggregator authorization endpoint
140 log.Printf("[POST-CREATE] Trusted Kagi aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
141 // Aggregators skip membership checks and visibility restrictions
142 // They are authorized services, not community members
143 } else if isOtherAggregator {
144 // OTHER AGGREGATOR VALIDATION FLOW
145 // Authorization and rate limits already validated above via ValidateAggregatorPost
146 log.Printf("[POST-CREATE] Authorized aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
147 } else {
148 // USER VALIDATION FLOW
149 // Check community visibility (Alpha: public/unlisted only)
150 // Beta will add membership checks for private communities
151 if community.Visibility == "private" {
152 return nil, ErrNotAuthorized
153 }
154 }
155
156 // 8. Ensure community has fresh PDS credentials (token refresh if needed)
157 community, err = s.communityService.EnsureFreshToken(ctx, community)
158 if err != nil {
159 return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
160 }
161
162 // 9. Build post record for PDS
163 postRecord := PostRecord{
164 Type: "social.coves.community.post",
165 Community: communityDID,
166 Author: req.AuthorDID,
167 Title: req.Title,
168 Content: req.Content,
169 Facets: req.Facets,
170 Embed: req.Embed, // Start with user-provided embed
171 Labels: req.Labels,
172 OriginalAuthor: req.OriginalAuthor,
173 FederatedFrom: req.FederatedFrom,
174 Location: req.Location,
175 CreatedAt: time.Now().UTC().Format(time.RFC3339),
176 }
177
178 // 10. Validate and enhance external embeds
179 if postRecord.Embed != nil {
180 if embedType, ok := postRecord.Embed["$type"].(string); ok && embedType == "social.coves.embed.external" {
181 if external, ok := postRecord.Embed["external"].(map[string]interface{}); ok {
182 // SECURITY: Validate thumb field (must be blob, not URL string)
183 // This validation happens BEFORE unfurl to catch client errors early
184 if existingThumb := external["thumb"]; existingThumb != nil {
185 if thumbStr, isString := existingThumb.(string); isString {
186 return nil, NewValidationError("thumb",
187 fmt.Sprintf("thumb must be a blob reference (with $type, ref, mimeType, size), not URL string: %s", thumbStr))
188 }
189
190 // Validate blob structure if provided
191 if thumbMap, isMap := existingThumb.(map[string]interface{}); isMap {
192 // Check for $type field
193 if thumbType, ok := thumbMap["$type"].(string); !ok || thumbType != "blob" {
194 return nil, NewValidationError("thumb",
195 fmt.Sprintf("thumb must have $type: blob (got: %v)", thumbType))
196 }
197 // Check for required blob fields
198 if _, hasRef := thumbMap["ref"]; !hasRef {
199 return nil, NewValidationError("thumb", "thumb blob missing required 'ref' field")
200 }
201 if _, hasMimeType := thumbMap["mimeType"]; !hasMimeType {
202 return nil, NewValidationError("thumb", "thumb blob missing required 'mimeType' field")
203 }
204 log.Printf("[POST-CREATE] Client provided valid thumbnail blob")
205 } else {
206 return nil, NewValidationError("thumb",
207 fmt.Sprintf("thumb must be a blob object, got: %T", existingThumb))
208 }
209 }
210
211 // TRUSTED AGGREGATOR: Allow Kagi aggregator to provide thumbnail URLs directly
212 // This bypasses unfurl for more accurate RSS-sourced thumbnails
213 if req.ThumbnailURL != nil && *req.ThumbnailURL != "" && isTrustedKagi {
214 log.Printf("[AGGREGATOR-THUMB] Trusted aggregator provided thumbnail: %s", *req.ThumbnailURL)
215
216 if s.blobService != nil {
217 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second)
218 defer blobCancel()
219
220 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, *req.ThumbnailURL)
221 if blobErr != nil {
222 log.Printf("[AGGREGATOR-THUMB] Failed to upload thumbnail: %v", blobErr)
223 // No fallback - aggregators only use RSS feed thumbnails
224 } else {
225 external["thumb"] = blob
226 log.Printf("[AGGREGATOR-THUMB] Successfully uploaded thumbnail from trusted aggregator")
227 }
228 }
229 }
230
231 // Unfurl enhancement (optional, only if URL is supported)
232 // Skip unfurl for trusted aggregators - they provide their own metadata
233 if !isTrustedKagi {
234 if uri, ok := external["uri"].(string); ok && uri != "" {
235 // Check if we support unfurling this URL
236 if s.unfurlService != nil && s.unfurlService.IsSupported(uri) {
237 log.Printf("[POST-CREATE] Unfurling URL: %s", uri)
238
239 // Unfurl with timeout (non-fatal if it fails)
240 unfurlCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
241 defer cancel()
242
243 result, err := s.unfurlService.UnfurlURL(unfurlCtx, uri)
244 if err != nil {
245 // Log but don't fail - user can still post with manual metadata
246 log.Printf("[POST-CREATE] Warning: Failed to unfurl URL %s: %v", uri, err)
247 } else {
248 // Enhance embed with fetched metadata (only if client didn't provide)
249 // Note: We respect client-provided values, even empty strings
250 // If client sends title="", we assume they want no title
251 if external["title"] == nil {
252 external["title"] = result.Title
253 }
254 if external["description"] == nil {
255 external["description"] = result.Description
256 }
257 // Always set metadata fields (provider, domain, type)
258 external["embedType"] = result.Type
259 external["provider"] = result.Provider
260 external["domain"] = result.Domain
261
262 // Upload thumbnail from unfurl if client didn't provide one
263 // (Thumb validation already happened above)
264 if external["thumb"] == nil {
265 if result.ThumbnailURL != "" && s.blobService != nil {
266 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second)
267 defer blobCancel()
268
269 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, result.ThumbnailURL)
270 if blobErr != nil {
271 log.Printf("[POST-CREATE] Warning: Failed to upload thumbnail for %s: %v", uri, blobErr)
272 } else {
273 external["thumb"] = blob
274 log.Printf("[POST-CREATE] Uploaded thumbnail blob for %s", uri)
275 }
276 }
277 }
278
279 log.Printf("[POST-CREATE] Successfully enhanced embed with unfurl data (provider: %s, type: %s)",
280 result.Provider, result.Type)
281 }
282 }
283 }
284 }
285 }
286 }
287 }
288
289 // 11. Write to community's PDS repository
290 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
291 if err != nil {
292 return nil, fmt.Errorf("failed to write post to PDS: %w", err)
293 }
294
295 // 12. Record aggregator post for rate limiting (non-Kagi aggregators only)
296 // Kagi is exempted from rate limiting via env var (temporary)
297 if isOtherAggregator && s.aggregatorService != nil {
298 if recordErr := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); recordErr != nil {
299 // Log but don't fail - post was already created successfully
300 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", recordErr)
301 }
302 }
303
304 // 13. Return response (AppView will index via Jetstream consumer)
305 log.Printf("[POST-CREATE] Author: %s (trustedKagi=%v, otherAggregator=%v), Community: %s, URI: %s",
306 req.AuthorDID, isTrustedKagi, isOtherAggregator, communityDID, uri)
307
308 return &CreatePostResponse{
309 URI: uri,
310 CID: cid,
311 }, nil
312}
313
314// validateCreateRequest validates basic input requirements
315func (s *postService) validateCreateRequest(req CreatePostRequest) error {
316 // Global content limits (from lexicon)
317 const (
318 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon
319 maxTitleLength = 3000 // 3k bytes
320 maxTitleGraphemes = 300 // 300 graphemes (simplified check)
321 )
322
323 // Validate community required
324 if req.Community == "" {
325 return NewValidationError("community", "community is required")
326 }
327
328 // Validate author DID set by handler
329 if req.AuthorDID == "" {
330 return NewValidationError("authorDid", "authorDid must be set from authenticated user")
331 }
332
333 // Validate content length
334 if req.Content != nil && len(*req.Content) > maxContentLength {
335 return NewValidationError("content",
336 fmt.Sprintf("content too long (max %d characters)", maxContentLength))
337 }
338
339 // Validate title length
340 if req.Title != nil {
341 if len(*req.Title) > maxTitleLength {
342 return NewValidationError("title",
343 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength))
344 }
345 // Simplified grapheme check (actual implementation would need unicode library)
346 // For Alpha, byte length check is sufficient
347 }
348
349 // Validate content labels are from known values
350 if req.Labels != nil {
351 validLabels := map[string]bool{
352 "nsfw": true,
353 "spoiler": true,
354 "violence": true,
355 }
356 for _, label := range req.Labels.Values {
357 if !validLabels[label.Val] {
358 return NewValidationError("labels",
359 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val))
360 }
361 }
362 }
363
364 return nil
365}
366
367// createPostOnPDS writes a post record to the community's PDS repository
368// Uses com.atproto.repo.createRecord endpoint
369func (s *postService) createPostOnPDS(
370 ctx context.Context,
371 community *communities.Community,
372 record PostRecord,
373) (uri, cid string, err error) {
374 // Use community's PDS URL (not service default) for federated communities
375 // Each community can be hosted on a different PDS instance
376 pdsURL := community.PDSURL
377 if pdsURL == "" {
378 // Fallback to service default if community doesn't have a PDS URL
379 // (shouldn't happen in practice, but safe default)
380 pdsURL = s.pdsURL
381 }
382
383 // Build PDS endpoint URL
384 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL)
385
386 // Build request payload
387 // IMPORTANT: repo is set to community DID, not author DID
388 // This writes the post to the community's repository
389 payload := map[string]interface{}{
390 "repo": community.DID, // Community's repository
391 "collection": "social.coves.community.post", // Collection type
392 "record": record, // The post record
393 // "rkey" omitted - PDS will auto-generate TID
394 }
395
396 // Marshal payload
397 jsonData, err := json.Marshal(payload)
398 if err != nil {
399 return "", "", fmt.Errorf("failed to marshal post payload: %w", err)
400 }
401
402 // Create HTTP request
403 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData))
404 if err != nil {
405 return "", "", fmt.Errorf("failed to create PDS request: %w", err)
406 }
407
408 // Set headers (auth + content type)
409 req.Header.Set("Content-Type", "application/json")
410 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken)
411
412 // Extended timeout for write operations (30 seconds)
413 client := &http.Client{
414 Timeout: 30 * time.Second,
415 }
416
417 // Execute request
418 resp, err := client.Do(req)
419 if err != nil {
420 return "", "", fmt.Errorf("PDS request failed: %w", err)
421 }
422 defer func() {
423 if closeErr := resp.Body.Close(); closeErr != nil {
424 log.Printf("Warning: failed to close response body: %v", closeErr)
425 }
426 }()
427
428 // Read response body
429 body, err := io.ReadAll(resp.Body)
430 if err != nil {
431 return "", "", fmt.Errorf("failed to read PDS response: %w", err)
432 }
433
434 // Check for errors
435 if resp.StatusCode != http.StatusOK {
436 // Sanitize error body for logging (prevent sensitive data leakage)
437 bodyPreview := string(body)
438 if len(bodyPreview) > 200 {
439 bodyPreview = bodyPreview[:200] + "... (truncated)"
440 }
441 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview)
442
443 // Return truncated error (defense in depth - handler will mask this further)
444 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview)
445 }
446
447 // Parse response
448 var result struct {
449 URI string `json:"uri"`
450 CID string `json:"cid"`
451 }
452 if err := json.Unmarshal(body, &result); err != nil {
453 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
454 }
455
456 return result.URI, result.CID, nil
457}