A community based topic aggregation platform built on atproto
1package posts
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/core/aggregators"
6 "Coves/internal/core/blobs"
7 "Coves/internal/core/communities"
8 "Coves/internal/core/unfurl"
9 "bytes"
10 "context"
11 "encoding/json"
12 "fmt"
13 "io"
14 "log"
15 "net/http"
16 "os"
17 "time"
18)
19
20type postService struct {
21 repo Repository
22 communityService communities.Service
23 aggregatorService aggregators.Service
24 blobService blobs.Service
25 unfurlService unfurl.Service
26 pdsURL string
27}
28
29// NewPostService creates a new post service
30// aggregatorService, blobService, and unfurlService can be nil if not needed (e.g., in tests or minimal setups)
31func NewPostService(
32 repo Repository,
33 communityService communities.Service,
34 aggregatorService aggregators.Service, // Optional: can be nil
35 blobService blobs.Service, // Optional: can be nil
36 unfurlService unfurl.Service, // Optional: can be nil
37 pdsURL string,
38) Service {
39 return &postService{
40 repo: repo,
41 communityService: communityService,
42 aggregatorService: aggregatorService,
43 blobService: blobService,
44 unfurlService: unfurlService,
45 pdsURL: pdsURL,
46 }
47}
48
49// CreatePost creates a new post in a community
50// Flow:
51// 1. Validate input
52// 2. Check if author is an aggregator (server-side validation using DID from JWT)
53// 3. If aggregator: validate authorization and rate limits, skip membership checks
54// 4. If user: resolve community and perform membership/ban validation
55// 5. Build post record
56// 6. Write to community's PDS repository
57// 7. If aggregator: record post for rate limiting
58// 8. Return URI/CID (AppView indexes asynchronously via Jetstream)
59func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
60 // 1. Validate basic input (before DID checks to give clear validation errors)
61 if err := s.validateCreateRequest(req); err != nil {
62 return nil, err
63 }
64
65 // 2. SECURITY: Extract authenticated DID from context (set by JWT middleware)
66 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed
67 authenticatedDID := middleware.GetAuthenticatedDID(ctx)
68 if authenticatedDID == "" {
69 return nil, fmt.Errorf("no authenticated DID in context - authentication required")
70 }
71
72 // SECURITY: Verify request DID matches authenticated DID from JWT
73 // This prevents DID spoofing where a malicious client or compromised handler
74 // could provide a different DID than what was authenticated
75 if authenticatedDID != req.AuthorDID {
76 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID)
77 return nil, fmt.Errorf("authenticated DID does not match author DID")
78 }
79
80 // 3. Determine actor type: Kagi aggregator, other aggregator, or regular user
81 kagiAggregatorDID := os.Getenv("KAGI_AGGREGATOR_DID")
82 isTrustedKagi := kagiAggregatorDID != "" && req.AuthorDID == kagiAggregatorDID
83
84 // Check if this is a non-Kagi aggregator (requires database lookup)
85 var isOtherAggregator bool
86 var err error
87 if !isTrustedKagi && s.aggregatorService != nil {
88 isOtherAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID)
89 if err != nil {
90 log.Printf("[POST-CREATE] Warning: failed to check if DID is aggregator: %v", err)
91 // Don't fail the request - treat as regular user if check fails
92 isOtherAggregator = false
93 }
94 }
95
96 // 4. Resolve community at-identifier (handle or DID) to DID
97 // This accepts both formats per atProto best practices:
98 // - Handles: !gardening.communities.coves.social
99 // - DIDs: did:plc:abc123 or did:web:coves.social
100 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community)
101 if err != nil {
102 // Handle specific error types appropriately
103 if communities.IsNotFound(err) {
104 return nil, ErrCommunityNotFound
105 }
106 if communities.IsValidationError(err) {
107 // Pass through validation errors (invalid format, etc.)
108 return nil, NewValidationError("community", err.Error())
109 }
110 // Infrastructure failures (DB errors, network issues) should be internal errors
111 // Don't leak internal details to client (e.g., "pq: connection refused")
112 return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
113 }
114
115 // 5. AUTHORIZATION: For non-Kagi aggregators, validate authorization and rate limits
116 // Kagi is exempted from database checks via env var (temporary until XRPC endpoint is ready)
117 if isOtherAggregator && s.aggregatorService != nil {
118 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil {
119 log.Printf("[POST-CREATE] Aggregator authorization failed: %s -> %s: %v", req.AuthorDID, communityDID, err)
120 return nil, fmt.Errorf("aggregator not authorized: %w", err)
121 }
122 log.Printf("[POST-CREATE] Aggregator authorized: %s -> %s", req.AuthorDID, communityDID)
123 }
124
125 // 6. Fetch community from AppView (includes all metadata)
126 community, err := s.communityService.GetByDID(ctx, communityDID)
127 if err != nil {
128 if communities.IsNotFound(err) {
129 return nil, ErrCommunityNotFound
130 }
131 return nil, fmt.Errorf("failed to fetch community: %w", err)
132 }
133
134 // 7. Apply validation based on actor type (aggregator vs user)
135 if isTrustedKagi {
136 // TRUSTED AGGREGATOR VALIDATION FLOW
137 // Kagi aggregator is authorized via KAGI_AGGREGATOR_DID env var (temporary)
138 // TODO: Replace with proper XRPC aggregator authorization endpoint
139 log.Printf("[POST-CREATE] Trusted Kagi aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
140 // Aggregators skip membership checks and visibility restrictions
141 // They are authorized services, not community members
142 } else if isOtherAggregator {
143 // OTHER AGGREGATOR VALIDATION FLOW
144 // Authorization and rate limits already validated above via ValidateAggregatorPost
145 log.Printf("[POST-CREATE] Authorized aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
146 } else {
147 // USER VALIDATION FLOW
148 // Check community visibility (Alpha: public/unlisted only)
149 // Beta will add membership checks for private communities
150 if community.Visibility == "private" {
151 return nil, ErrNotAuthorized
152 }
153 }
154
155 // 8. Ensure community has fresh PDS credentials (token refresh if needed)
156 community, err = s.communityService.EnsureFreshToken(ctx, community)
157 if err != nil {
158 return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
159 }
160
161 // 9. Build post record for PDS
162 postRecord := PostRecord{
163 Type: "social.coves.community.post",
164 Community: communityDID,
165 Author: req.AuthorDID,
166 Title: req.Title,
167 Content: req.Content,
168 Facets: req.Facets,
169 Embed: req.Embed, // Start with user-provided embed
170 Labels: req.Labels,
171 OriginalAuthor: req.OriginalAuthor,
172 FederatedFrom: req.FederatedFrom,
173 Location: req.Location,
174 CreatedAt: time.Now().UTC().Format(time.RFC3339),
175 }
176
177 // 10. Validate and enhance external embeds
178 if postRecord.Embed != nil {
179 if embedType, ok := postRecord.Embed["$type"].(string); ok && embedType == "social.coves.embed.external" {
180 if external, ok := postRecord.Embed["external"].(map[string]interface{}); ok {
181 // SECURITY: Validate thumb field (must be blob, not URL string)
182 // This validation happens BEFORE unfurl to catch client errors early
183 if existingThumb := external["thumb"]; existingThumb != nil {
184 if thumbStr, isString := existingThumb.(string); isString {
185 return nil, NewValidationError("thumb",
186 fmt.Sprintf("thumb must be a blob reference (with $type, ref, mimeType, size), not URL string: %s", thumbStr))
187 }
188
189 // Validate blob structure if provided
190 if thumbMap, isMap := existingThumb.(map[string]interface{}); isMap {
191 // Check for $type field
192 if thumbType, ok := thumbMap["$type"].(string); !ok || thumbType != "blob" {
193 return nil, NewValidationError("thumb",
194 fmt.Sprintf("thumb must have $type: blob (got: %v)", thumbType))
195 }
196 // Check for required blob fields
197 if _, hasRef := thumbMap["ref"]; !hasRef {
198 return nil, NewValidationError("thumb", "thumb blob missing required 'ref' field")
199 }
200 if _, hasMimeType := thumbMap["mimeType"]; !hasMimeType {
201 return nil, NewValidationError("thumb", "thumb blob missing required 'mimeType' field")
202 }
203 log.Printf("[POST-CREATE] Client provided valid thumbnail blob")
204 } else {
205 return nil, NewValidationError("thumb",
206 fmt.Sprintf("thumb must be a blob object, got: %T", existingThumb))
207 }
208 }
209
210 // TRUSTED AGGREGATOR: Allow Kagi aggregator to provide thumbnail URLs directly
211 // This bypasses unfurl for more accurate RSS-sourced thumbnails
212 if req.ThumbnailURL != nil && *req.ThumbnailURL != "" && isTrustedKagi {
213 log.Printf("[AGGREGATOR-THUMB] Trusted aggregator provided thumbnail: %s", *req.ThumbnailURL)
214
215 if s.blobService != nil {
216 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second)
217 defer blobCancel()
218
219 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, *req.ThumbnailURL)
220 if blobErr != nil {
221 log.Printf("[AGGREGATOR-THUMB] Failed to upload thumbnail: %v", blobErr)
222 // No fallback - aggregators only use RSS feed thumbnails
223 } else {
224 external["thumb"] = blob
225 log.Printf("[AGGREGATOR-THUMB] Successfully uploaded thumbnail from trusted aggregator")
226 }
227 }
228 }
229
230 // Unfurl enhancement (optional, only if URL is supported)
231 // Skip unfurl for trusted aggregators - they provide their own metadata
232 if !isTrustedKagi {
233 if uri, ok := external["uri"].(string); ok && uri != "" {
234 // Check if we support unfurling this URL
235 if s.unfurlService != nil && s.unfurlService.IsSupported(uri) {
236 log.Printf("[POST-CREATE] Unfurling URL: %s", uri)
237
238 // Unfurl with timeout (non-fatal if it fails)
239 unfurlCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
240 defer cancel()
241
242 result, err := s.unfurlService.UnfurlURL(unfurlCtx, uri)
243 if err != nil {
244 // Log but don't fail - user can still post with manual metadata
245 log.Printf("[POST-CREATE] Warning: Failed to unfurl URL %s: %v", uri, err)
246 } else {
247 // Enhance embed with fetched metadata (only if client didn't provide)
248 // Note: We respect client-provided values, even empty strings
249 // If client sends title="", we assume they want no title
250 if external["title"] == nil {
251 external["title"] = result.Title
252 }
253 if external["description"] == nil {
254 external["description"] = result.Description
255 }
256 // Always set metadata fields (provider, domain, type)
257 external["embedType"] = result.Type
258 external["provider"] = result.Provider
259 external["domain"] = result.Domain
260
261 // Upload thumbnail from unfurl if client didn't provide one
262 // (Thumb validation already happened above)
263 if external["thumb"] == nil {
264 if result.ThumbnailURL != "" && s.blobService != nil {
265 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second)
266 defer blobCancel()
267
268 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, result.ThumbnailURL)
269 if blobErr != nil {
270 log.Printf("[POST-CREATE] Warning: Failed to upload thumbnail for %s: %v", uri, blobErr)
271 } else {
272 external["thumb"] = blob
273 log.Printf("[POST-CREATE] Uploaded thumbnail blob for %s", uri)
274 }
275 }
276 }
277
278 log.Printf("[POST-CREATE] Successfully enhanced embed with unfurl data (provider: %s, type: %s)",
279 result.Provider, result.Type)
280 }
281 }
282 }
283 }
284 }
285 }
286 }
287
288 // 11. Write to community's PDS repository
289 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
290 if err != nil {
291 return nil, fmt.Errorf("failed to write post to PDS: %w", err)
292 }
293
294 // 12. Record aggregator post for rate limiting (non-Kagi aggregators only)
295 // Kagi is exempted from rate limiting via env var (temporary)
296 if isOtherAggregator && s.aggregatorService != nil {
297 if recordErr := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); recordErr != nil {
298 // Log but don't fail - post was already created successfully
299 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", recordErr)
300 }
301 }
302
303 // 13. Return response (AppView will index via Jetstream consumer)
304 log.Printf("[POST-CREATE] Author: %s (trustedKagi=%v, otherAggregator=%v), Community: %s, URI: %s",
305 req.AuthorDID, isTrustedKagi, isOtherAggregator, communityDID, uri)
306
307 return &CreatePostResponse{
308 URI: uri,
309 CID: cid,
310 }, nil
311}
312
313// validateCreateRequest validates basic input requirements
314func (s *postService) validateCreateRequest(req CreatePostRequest) error {
315 // Global content limits (from lexicon)
316 const (
317 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon
318 maxTitleLength = 3000 // 3k bytes
319 maxTitleGraphemes = 300 // 300 graphemes (simplified check)
320 )
321
322 // Validate community required
323 if req.Community == "" {
324 return NewValidationError("community", "community is required")
325 }
326
327 // Validate author DID set by handler
328 if req.AuthorDID == "" {
329 return NewValidationError("authorDid", "authorDid must be set from authenticated user")
330 }
331
332 // Validate content length
333 if req.Content != nil && len(*req.Content) > maxContentLength {
334 return NewValidationError("content",
335 fmt.Sprintf("content too long (max %d characters)", maxContentLength))
336 }
337
338 // Validate title length
339 if req.Title != nil {
340 if len(*req.Title) > maxTitleLength {
341 return NewValidationError("title",
342 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength))
343 }
344 // Simplified grapheme check (actual implementation would need unicode library)
345 // For Alpha, byte length check is sufficient
346 }
347
348 // Validate content labels are from known values
349 if req.Labels != nil {
350 validLabels := map[string]bool{
351 "nsfw": true,
352 "spoiler": true,
353 "violence": true,
354 }
355 for _, label := range req.Labels.Values {
356 if !validLabels[label.Val] {
357 return NewValidationError("labels",
358 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val))
359 }
360 }
361 }
362
363 return nil
364}
365
366// createPostOnPDS writes a post record to the community's PDS repository
367// Uses com.atproto.repo.createRecord endpoint
368func (s *postService) createPostOnPDS(
369 ctx context.Context,
370 community *communities.Community,
371 record PostRecord,
372) (uri, cid string, err error) {
373 // Use community's PDS URL (not service default) for federated communities
374 // Each community can be hosted on a different PDS instance
375 pdsURL := community.PDSURL
376 if pdsURL == "" {
377 // Fallback to service default if community doesn't have a PDS URL
378 // (shouldn't happen in practice, but safe default)
379 pdsURL = s.pdsURL
380 }
381
382 // Build PDS endpoint URL
383 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL)
384
385 // Build request payload
386 // IMPORTANT: repo is set to community DID, not author DID
387 // This writes the post to the community's repository
388 payload := map[string]interface{}{
389 "repo": community.DID, // Community's repository
390 "collection": "social.coves.community.post", // Collection type
391 "record": record, // The post record
392 // "rkey" omitted - PDS will auto-generate TID
393 }
394
395 // Marshal payload
396 jsonData, err := json.Marshal(payload)
397 if err != nil {
398 return "", "", fmt.Errorf("failed to marshal post payload: %w", err)
399 }
400
401 // Create HTTP request
402 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData))
403 if err != nil {
404 return "", "", fmt.Errorf("failed to create PDS request: %w", err)
405 }
406
407 // Set headers (auth + content type)
408 req.Header.Set("Content-Type", "application/json")
409 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken)
410
411 // Extended timeout for write operations (30 seconds)
412 client := &http.Client{
413 Timeout: 30 * time.Second,
414 }
415
416 // Execute request
417 resp, err := client.Do(req)
418 if err != nil {
419 return "", "", fmt.Errorf("PDS request failed: %w", err)
420 }
421 defer func() {
422 if closeErr := resp.Body.Close(); closeErr != nil {
423 log.Printf("Warning: failed to close response body: %v", closeErr)
424 }
425 }()
426
427 // Read response body
428 body, err := io.ReadAll(resp.Body)
429 if err != nil {
430 return "", "", fmt.Errorf("failed to read PDS response: %w", err)
431 }
432
433 // Check for errors
434 if resp.StatusCode != http.StatusOK {
435 // Sanitize error body for logging (prevent sensitive data leakage)
436 bodyPreview := string(body)
437 if len(bodyPreview) > 200 {
438 bodyPreview = bodyPreview[:200] + "... (truncated)"
439 }
440 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview)
441
442 // Return truncated error (defense in depth - handler will mask this further)
443 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview)
444 }
445
446 // Parse response
447 var result struct {
448 URI string `json:"uri"`
449 CID string `json:"cid"`
450 }
451 if err := json.Unmarshal(body, &result); err != nil {
452 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
453 }
454
455 return result.URI, result.CID, nil
456}