A community based topic aggregation platform built on atproto
1package posts 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/core/aggregators" 6 "Coves/internal/core/blobs" 7 "Coves/internal/core/communities" 8 "Coves/internal/core/unfurl" 9 "bytes" 10 "context" 11 "encoding/json" 12 "fmt" 13 "io" 14 "log" 15 "net/http" 16 "os" 17 "time" 18) 19 20type postService struct { 21 repo Repository 22 communityService communities.Service 23 aggregatorService aggregators.Service 24 blobService blobs.Service 25 unfurlService unfurl.Service 26 pdsURL string 27} 28 29// NewPostService creates a new post service 30// aggregatorService, blobService, and unfurlService can be nil if not needed (e.g., in tests or minimal setups) 31func NewPostService( 32 repo Repository, 33 communityService communities.Service, 34 aggregatorService aggregators.Service, // Optional: can be nil 35 blobService blobs.Service, // Optional: can be nil 36 unfurlService unfurl.Service, // Optional: can be nil 37 pdsURL string, 38) Service { 39 return &postService{ 40 repo: repo, 41 communityService: communityService, 42 aggregatorService: aggregatorService, 43 blobService: blobService, 44 unfurlService: unfurlService, 45 pdsURL: pdsURL, 46 } 47} 48 49// CreatePost creates a new post in a community 50// Flow: 51// 1. Validate input 52// 2. Check if author is an aggregator (server-side validation using DID from JWT) 53// 3. If aggregator: validate authorization and rate limits, skip membership checks 54// 4. If user: resolve community and perform membership/ban validation 55// 5. Build post record 56// 6. Write to community's PDS repository 57// 7. If aggregator: record post for rate limiting 58// 8. Return URI/CID (AppView indexes asynchronously via Jetstream) 59func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) { 60 // 1. Validate basic input (before DID checks to give clear validation errors) 61 if err := s.validateCreateRequest(req); err != nil { 62 return nil, err 63 } 64 65 // 2. SECURITY: Extract authenticated DID from context (set by JWT middleware) 66 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed 67 authenticatedDID := middleware.GetAuthenticatedDID(ctx) 68 if authenticatedDID == "" { 69 return nil, fmt.Errorf("no authenticated DID in context - authentication required") 70 } 71 72 // SECURITY: Verify request DID matches authenticated DID from JWT 73 // This prevents DID spoofing where a malicious client or compromised handler 74 // could provide a different DID than what was authenticated 75 if authenticatedDID != req.AuthorDID { 76 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID) 77 return nil, fmt.Errorf("authenticated DID does not match author DID") 78 } 79 80 // 3. Determine actor type: Kagi aggregator, other aggregator, or regular user 81 kagiAggregatorDID := os.Getenv("KAGI_AGGREGATOR_DID") 82 isTrustedKagi := kagiAggregatorDID != "" && req.AuthorDID == kagiAggregatorDID 83 84 // Check if this is a non-Kagi aggregator (requires database lookup) 85 var isOtherAggregator bool 86 var err error 87 if !isTrustedKagi && s.aggregatorService != nil { 88 isOtherAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID) 89 if err != nil { 90 log.Printf("[POST-CREATE] Warning: failed to check if DID is aggregator: %v", err) 91 // Don't fail the request - treat as regular user if check fails 92 isOtherAggregator = false 93 } 94 } 95 96 // 4. Resolve community at-identifier (handle or DID) to DID 97 // This accepts both formats per atProto best practices: 98 // - Handles: !gardening.communities.coves.social 99 // - DIDs: did:plc:abc123 or did:web:coves.social 100 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community) 101 if err != nil { 102 // Handle specific error types appropriately 103 if communities.IsNotFound(err) { 104 return nil, ErrCommunityNotFound 105 } 106 if communities.IsValidationError(err) { 107 // Pass through validation errors (invalid format, etc.) 108 return nil, NewValidationError("community", err.Error()) 109 } 110 // Infrastructure failures (DB errors, network issues) should be internal errors 111 // Don't leak internal details to client (e.g., "pq: connection refused") 112 return nil, fmt.Errorf("failed to resolve community identifier: %w", err) 113 } 114 115 // 5. AUTHORIZATION: For non-Kagi aggregators, validate authorization and rate limits 116 // Kagi is exempted from database checks via env var (temporary until XRPC endpoint is ready) 117 if isOtherAggregator && s.aggregatorService != nil { 118 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil { 119 log.Printf("[POST-CREATE] Aggregator authorization failed: %s -> %s: %v", req.AuthorDID, communityDID, err) 120 return nil, fmt.Errorf("aggregator not authorized: %w", err) 121 } 122 log.Printf("[POST-CREATE] Aggregator authorized: %s -> %s", req.AuthorDID, communityDID) 123 } 124 125 // 6. Fetch community from AppView (includes all metadata) 126 community, err := s.communityService.GetByDID(ctx, communityDID) 127 if err != nil { 128 if communities.IsNotFound(err) { 129 return nil, ErrCommunityNotFound 130 } 131 return nil, fmt.Errorf("failed to fetch community: %w", err) 132 } 133 134 // 7. Apply validation based on actor type (aggregator vs user) 135 if isTrustedKagi { 136 // TRUSTED AGGREGATOR VALIDATION FLOW 137 // Kagi aggregator is authorized via KAGI_AGGREGATOR_DID env var (temporary) 138 // TODO: Replace with proper XRPC aggregator authorization endpoint 139 log.Printf("[POST-CREATE] Trusted Kagi aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 140 // Aggregators skip membership checks and visibility restrictions 141 // They are authorized services, not community members 142 } else if isOtherAggregator { 143 // OTHER AGGREGATOR VALIDATION FLOW 144 // Authorization and rate limits already validated above via ValidateAggregatorPost 145 log.Printf("[POST-CREATE] Authorized aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 146 } else { 147 // USER VALIDATION FLOW 148 // Check community visibility (Alpha: public/unlisted only) 149 // Beta will add membership checks for private communities 150 if community.Visibility == "private" { 151 return nil, ErrNotAuthorized 152 } 153 } 154 155 // 8. Ensure community has fresh PDS credentials (token refresh if needed) 156 community, err = s.communityService.EnsureFreshToken(ctx, community) 157 if err != nil { 158 return nil, fmt.Errorf("failed to refresh community credentials: %w", err) 159 } 160 161 // 9. Build post record for PDS 162 postRecord := PostRecord{ 163 Type: "social.coves.community.post", 164 Community: communityDID, 165 Author: req.AuthorDID, 166 Title: req.Title, 167 Content: req.Content, 168 Facets: req.Facets, 169 Embed: req.Embed, // Start with user-provided embed 170 Labels: req.Labels, 171 OriginalAuthor: req.OriginalAuthor, 172 FederatedFrom: req.FederatedFrom, 173 Location: req.Location, 174 CreatedAt: time.Now().UTC().Format(time.RFC3339), 175 } 176 177 // 10. Validate and enhance external embeds 178 if postRecord.Embed != nil { 179 if embedType, ok := postRecord.Embed["$type"].(string); ok && embedType == "social.coves.embed.external" { 180 if external, ok := postRecord.Embed["external"].(map[string]interface{}); ok { 181 // SECURITY: Validate thumb field (must be blob, not URL string) 182 // This validation happens BEFORE unfurl to catch client errors early 183 if existingThumb := external["thumb"]; existingThumb != nil { 184 if thumbStr, isString := existingThumb.(string); isString { 185 return nil, NewValidationError("thumb", 186 fmt.Sprintf("thumb must be a blob reference (with $type, ref, mimeType, size), not URL string: %s", thumbStr)) 187 } 188 189 // Validate blob structure if provided 190 if thumbMap, isMap := existingThumb.(map[string]interface{}); isMap { 191 // Check for $type field 192 if thumbType, ok := thumbMap["$type"].(string); !ok || thumbType != "blob" { 193 return nil, NewValidationError("thumb", 194 fmt.Sprintf("thumb must have $type: blob (got: %v)", thumbType)) 195 } 196 // Check for required blob fields 197 if _, hasRef := thumbMap["ref"]; !hasRef { 198 return nil, NewValidationError("thumb", "thumb blob missing required 'ref' field") 199 } 200 if _, hasMimeType := thumbMap["mimeType"]; !hasMimeType { 201 return nil, NewValidationError("thumb", "thumb blob missing required 'mimeType' field") 202 } 203 log.Printf("[POST-CREATE] Client provided valid thumbnail blob") 204 } else { 205 return nil, NewValidationError("thumb", 206 fmt.Sprintf("thumb must be a blob object, got: %T", existingThumb)) 207 } 208 } 209 210 // TRUSTED AGGREGATOR: Allow Kagi aggregator to provide thumbnail URLs directly 211 // This bypasses unfurl for more accurate RSS-sourced thumbnails 212 if req.ThumbnailURL != nil && *req.ThumbnailURL != "" && isTrustedKagi { 213 log.Printf("[AGGREGATOR-THUMB] Trusted aggregator provided thumbnail: %s", *req.ThumbnailURL) 214 215 if s.blobService != nil { 216 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second) 217 defer blobCancel() 218 219 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, *req.ThumbnailURL) 220 if blobErr != nil { 221 log.Printf("[AGGREGATOR-THUMB] Failed to upload thumbnail: %v", blobErr) 222 // No fallback - aggregators only use RSS feed thumbnails 223 } else { 224 external["thumb"] = blob 225 log.Printf("[AGGREGATOR-THUMB] Successfully uploaded thumbnail from trusted aggregator") 226 } 227 } 228 } 229 230 // Unfurl enhancement (optional, only if URL is supported) 231 // Skip unfurl for trusted aggregators - they provide their own metadata 232 if !isTrustedKagi { 233 if uri, ok := external["uri"].(string); ok && uri != "" { 234 // Check if we support unfurling this URL 235 if s.unfurlService != nil && s.unfurlService.IsSupported(uri) { 236 log.Printf("[POST-CREATE] Unfurling URL: %s", uri) 237 238 // Unfurl with timeout (non-fatal if it fails) 239 unfurlCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 240 defer cancel() 241 242 result, err := s.unfurlService.UnfurlURL(unfurlCtx, uri) 243 if err != nil { 244 // Log but don't fail - user can still post with manual metadata 245 log.Printf("[POST-CREATE] Warning: Failed to unfurl URL %s: %v", uri, err) 246 } else { 247 // Enhance embed with fetched metadata (only if client didn't provide) 248 // Note: We respect client-provided values, even empty strings 249 // If client sends title="", we assume they want no title 250 if external["title"] == nil { 251 external["title"] = result.Title 252 } 253 if external["description"] == nil { 254 external["description"] = result.Description 255 } 256 // Always set metadata fields (provider, domain, type) 257 external["embedType"] = result.Type 258 external["provider"] = result.Provider 259 external["domain"] = result.Domain 260 261 // Upload thumbnail from unfurl if client didn't provide one 262 // (Thumb validation already happened above) 263 if external["thumb"] == nil { 264 if result.ThumbnailURL != "" && s.blobService != nil { 265 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second) 266 defer blobCancel() 267 268 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, result.ThumbnailURL) 269 if blobErr != nil { 270 log.Printf("[POST-CREATE] Warning: Failed to upload thumbnail for %s: %v", uri, blobErr) 271 } else { 272 external["thumb"] = blob 273 log.Printf("[POST-CREATE] Uploaded thumbnail blob for %s", uri) 274 } 275 } 276 } 277 278 log.Printf("[POST-CREATE] Successfully enhanced embed with unfurl data (provider: %s, type: %s)", 279 result.Provider, result.Type) 280 } 281 } 282 } 283 } 284 } 285 } 286 } 287 288 // 11. Write to community's PDS repository 289 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord) 290 if err != nil { 291 return nil, fmt.Errorf("failed to write post to PDS: %w", err) 292 } 293 294 // 12. Record aggregator post for rate limiting (non-Kagi aggregators only) 295 // Kagi is exempted from rate limiting via env var (temporary) 296 if isOtherAggregator && s.aggregatorService != nil { 297 if recordErr := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); recordErr != nil { 298 // Log but don't fail - post was already created successfully 299 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", recordErr) 300 } 301 } 302 303 // 13. Return response (AppView will index via Jetstream consumer) 304 log.Printf("[POST-CREATE] Author: %s (trustedKagi=%v, otherAggregator=%v), Community: %s, URI: %s", 305 req.AuthorDID, isTrustedKagi, isOtherAggregator, communityDID, uri) 306 307 return &CreatePostResponse{ 308 URI: uri, 309 CID: cid, 310 }, nil 311} 312 313// validateCreateRequest validates basic input requirements 314func (s *postService) validateCreateRequest(req CreatePostRequest) error { 315 // Global content limits (from lexicon) 316 const ( 317 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon 318 maxTitleLength = 3000 // 3k bytes 319 maxTitleGraphemes = 300 // 300 graphemes (simplified check) 320 ) 321 322 // Validate community required 323 if req.Community == "" { 324 return NewValidationError("community", "community is required") 325 } 326 327 // Validate author DID set by handler 328 if req.AuthorDID == "" { 329 return NewValidationError("authorDid", "authorDid must be set from authenticated user") 330 } 331 332 // Validate content length 333 if req.Content != nil && len(*req.Content) > maxContentLength { 334 return NewValidationError("content", 335 fmt.Sprintf("content too long (max %d characters)", maxContentLength)) 336 } 337 338 // Validate title length 339 if req.Title != nil { 340 if len(*req.Title) > maxTitleLength { 341 return NewValidationError("title", 342 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength)) 343 } 344 // Simplified grapheme check (actual implementation would need unicode library) 345 // For Alpha, byte length check is sufficient 346 } 347 348 // Validate content labels are from known values 349 if req.Labels != nil { 350 validLabels := map[string]bool{ 351 "nsfw": true, 352 "spoiler": true, 353 "violence": true, 354 } 355 for _, label := range req.Labels.Values { 356 if !validLabels[label.Val] { 357 return NewValidationError("labels", 358 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val)) 359 } 360 } 361 } 362 363 return nil 364} 365 366// createPostOnPDS writes a post record to the community's PDS repository 367// Uses com.atproto.repo.createRecord endpoint 368func (s *postService) createPostOnPDS( 369 ctx context.Context, 370 community *communities.Community, 371 record PostRecord, 372) (uri, cid string, err error) { 373 // Use community's PDS URL (not service default) for federated communities 374 // Each community can be hosted on a different PDS instance 375 pdsURL := community.PDSURL 376 if pdsURL == "" { 377 // Fallback to service default if community doesn't have a PDS URL 378 // (shouldn't happen in practice, but safe default) 379 pdsURL = s.pdsURL 380 } 381 382 // Build PDS endpoint URL 383 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL) 384 385 // Build request payload 386 // IMPORTANT: repo is set to community DID, not author DID 387 // This writes the post to the community's repository 388 payload := map[string]interface{}{ 389 "repo": community.DID, // Community's repository 390 "collection": "social.coves.community.post", // Collection type 391 "record": record, // The post record 392 // "rkey" omitted - PDS will auto-generate TID 393 } 394 395 // Marshal payload 396 jsonData, err := json.Marshal(payload) 397 if err != nil { 398 return "", "", fmt.Errorf("failed to marshal post payload: %w", err) 399 } 400 401 // Create HTTP request 402 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData)) 403 if err != nil { 404 return "", "", fmt.Errorf("failed to create PDS request: %w", err) 405 } 406 407 // Set headers (auth + content type) 408 req.Header.Set("Content-Type", "application/json") 409 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken) 410 411 // Extended timeout for write operations (30 seconds) 412 client := &http.Client{ 413 Timeout: 30 * time.Second, 414 } 415 416 // Execute request 417 resp, err := client.Do(req) 418 if err != nil { 419 return "", "", fmt.Errorf("PDS request failed: %w", err) 420 } 421 defer func() { 422 if closeErr := resp.Body.Close(); closeErr != nil { 423 log.Printf("Warning: failed to close response body: %v", closeErr) 424 } 425 }() 426 427 // Read response body 428 body, err := io.ReadAll(resp.Body) 429 if err != nil { 430 return "", "", fmt.Errorf("failed to read PDS response: %w", err) 431 } 432 433 // Check for errors 434 if resp.StatusCode != http.StatusOK { 435 // Sanitize error body for logging (prevent sensitive data leakage) 436 bodyPreview := string(body) 437 if len(bodyPreview) > 200 { 438 bodyPreview = bodyPreview[:200] + "... (truncated)" 439 } 440 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview) 441 442 // Return truncated error (defense in depth - handler will mask this further) 443 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview) 444 } 445 446 // Parse response 447 var result struct { 448 URI string `json:"uri"` 449 CID string `json:"cid"` 450 } 451 if err := json.Unmarshal(body, &result); err != nil { 452 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 453 } 454 455 return result.URI, result.CID, nil 456}