A community based topic aggregation platform built on atproto
1package posts 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "net/http" 11 "os" 12 "time" 13 14 "Coves/internal/api/middleware" 15 "Coves/internal/core/aggregators" 16 "Coves/internal/core/blobs" 17 "Coves/internal/core/communities" 18 "Coves/internal/core/unfurl" 19) 20 21type postService struct { 22 repo Repository 23 communityService communities.Service 24 aggregatorService aggregators.Service 25 blobService blobs.Service 26 unfurlService unfurl.Service 27 pdsURL string 28} 29 30// NewPostService creates a new post service 31// aggregatorService, blobService, and unfurlService can be nil if not needed (e.g., in tests or minimal setups) 32func NewPostService( 33 repo Repository, 34 communityService communities.Service, 35 aggregatorService aggregators.Service, // Optional: can be nil 36 blobService blobs.Service, // Optional: can be nil 37 unfurlService unfurl.Service, // Optional: can be nil 38 pdsURL string, 39) Service { 40 return &postService{ 41 repo: repo, 42 communityService: communityService, 43 aggregatorService: aggregatorService, 44 blobService: blobService, 45 unfurlService: unfurlService, 46 pdsURL: pdsURL, 47 } 48} 49 50// CreatePost creates a new post in a community 51// Flow: 52// 1. Validate input 53// 2. Check if author is an aggregator (server-side validation using DID from JWT) 54// 3. If aggregator: validate authorization and rate limits, skip membership checks 55// 4. If user: resolve community and perform membership/ban validation 56// 5. Build post record 57// 6. Write to community's PDS repository 58// 7. If aggregator: record post for rate limiting 59// 8. Return URI/CID (AppView indexes asynchronously via Jetstream) 60func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) { 61 // 1. Validate basic input (before DID checks to give clear validation errors) 62 if err := s.validateCreateRequest(req); err != nil { 63 return nil, err 64 } 65 66 // 2. SECURITY: Extract authenticated DID from context (set by JWT middleware) 67 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed 68 authenticatedDID := middleware.GetAuthenticatedDID(ctx) 69 if authenticatedDID == "" { 70 return nil, fmt.Errorf("no authenticated DID in context - authentication required") 71 } 72 73 // SECURITY: Verify request DID matches authenticated DID from JWT 74 // This prevents DID spoofing where a malicious client or compromised handler 75 // could provide a different DID than what was authenticated 76 if authenticatedDID != req.AuthorDID { 77 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID) 78 return nil, fmt.Errorf("authenticated DID does not match author DID") 79 } 80 81 // 3. Determine actor type: Kagi aggregator, other aggregator, or regular user 82 kagiAggregatorDID := os.Getenv("KAGI_AGGREGATOR_DID") 83 isTrustedKagi := kagiAggregatorDID != "" && req.AuthorDID == kagiAggregatorDID 84 85 // Check if this is a non-Kagi aggregator (requires database lookup) 86 var isOtherAggregator bool 87 var err error 88 if !isTrustedKagi && s.aggregatorService != nil { 89 isOtherAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID) 90 if err != nil { 91 log.Printf("[POST-CREATE] Warning: failed to check if DID is aggregator: %v", err) 92 // Don't fail the request - treat as regular user if check fails 93 isOtherAggregator = false 94 } 95 } 96 97 // 4. Resolve community at-identifier (handle or DID) to DID 98 // This accepts both formats per atProto best practices: 99 // - Handles: !gardening.communities.coves.social 100 // - DIDs: did:plc:abc123 or did:web:coves.social 101 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community) 102 if err != nil { 103 // Handle specific error types appropriately 104 if communities.IsNotFound(err) { 105 return nil, ErrCommunityNotFound 106 } 107 if communities.IsValidationError(err) { 108 // Pass through validation errors (invalid format, etc.) 109 return nil, NewValidationError("community", err.Error()) 110 } 111 // Infrastructure failures (DB errors, network issues) should be internal errors 112 // Don't leak internal details to client (e.g., "pq: connection refused") 113 return nil, fmt.Errorf("failed to resolve community identifier: %w", err) 114 } 115 116 // 5. AUTHORIZATION: For non-Kagi aggregators, validate authorization and rate limits 117 // Kagi is exempted from database checks via env var (temporary until XRPC endpoint is ready) 118 if isOtherAggregator && s.aggregatorService != nil { 119 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil { 120 log.Printf("[POST-CREATE] Aggregator authorization failed: %s -> %s: %v", req.AuthorDID, communityDID, err) 121 return nil, fmt.Errorf("aggregator not authorized: %w", err) 122 } 123 log.Printf("[POST-CREATE] Aggregator authorized: %s -> %s", req.AuthorDID, communityDID) 124 } 125 126 // 6. Fetch community from AppView (includes all metadata) 127 community, err := s.communityService.GetByDID(ctx, communityDID) 128 if err != nil { 129 if communities.IsNotFound(err) { 130 return nil, ErrCommunityNotFound 131 } 132 return nil, fmt.Errorf("failed to fetch community: %w", err) 133 } 134 135 // 7. Apply validation based on actor type (aggregator vs user) 136 if isTrustedKagi { 137 // TRUSTED AGGREGATOR VALIDATION FLOW 138 // Kagi aggregator is authorized via KAGI_AGGREGATOR_DID env var (temporary) 139 // TODO: Replace with proper XRPC aggregator authorization endpoint 140 log.Printf("[POST-CREATE] Trusted Kagi aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 141 // Aggregators skip membership checks and visibility restrictions 142 // They are authorized services, not community members 143 } else if isOtherAggregator { 144 // OTHER AGGREGATOR VALIDATION FLOW 145 // Authorization and rate limits already validated above via ValidateAggregatorPost 146 log.Printf("[POST-CREATE] Authorized aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 147 } else { 148 // USER VALIDATION FLOW 149 // Check community visibility (Alpha: public/unlisted only) 150 // Beta will add membership checks for private communities 151 if community.Visibility == "private" { 152 return nil, ErrNotAuthorized 153 } 154 } 155 156 // 8. Ensure community has fresh PDS credentials (token refresh if needed) 157 community, err = s.communityService.EnsureFreshToken(ctx, community) 158 if err != nil { 159 return nil, fmt.Errorf("failed to refresh community credentials: %w", err) 160 } 161 162 // 9. Build post record for PDS 163 postRecord := PostRecord{ 164 Type: "social.coves.community.post", 165 Community: communityDID, 166 Author: req.AuthorDID, 167 Title: req.Title, 168 Content: req.Content, 169 Facets: req.Facets, 170 Embed: req.Embed, // Start with user-provided embed 171 Labels: req.Labels, 172 OriginalAuthor: req.OriginalAuthor, 173 FederatedFrom: req.FederatedFrom, 174 Location: req.Location, 175 CreatedAt: time.Now().UTC().Format(time.RFC3339), 176 } 177 178 // 10. Validate and enhance external embeds 179 if postRecord.Embed != nil { 180 if embedType, ok := postRecord.Embed["$type"].(string); ok && embedType == "social.coves.embed.external" { 181 if external, ok := postRecord.Embed["external"].(map[string]interface{}); ok { 182 // SECURITY: Validate thumb field (must be blob, not URL string) 183 // This validation happens BEFORE unfurl to catch client errors early 184 if existingThumb := external["thumb"]; existingThumb != nil { 185 if thumbStr, isString := existingThumb.(string); isString { 186 return nil, NewValidationError("thumb", 187 fmt.Sprintf("thumb must be a blob reference (with $type, ref, mimeType, size), not URL string: %s", thumbStr)) 188 } 189 190 // Validate blob structure if provided 191 if thumbMap, isMap := existingThumb.(map[string]interface{}); isMap { 192 // Check for $type field 193 if thumbType, ok := thumbMap["$type"].(string); !ok || thumbType != "blob" { 194 return nil, NewValidationError("thumb", 195 fmt.Sprintf("thumb must have $type: blob (got: %v)", thumbType)) 196 } 197 // Check for required blob fields 198 if _, hasRef := thumbMap["ref"]; !hasRef { 199 return nil, NewValidationError("thumb", "thumb blob missing required 'ref' field") 200 } 201 if _, hasMimeType := thumbMap["mimeType"]; !hasMimeType { 202 return nil, NewValidationError("thumb", "thumb blob missing required 'mimeType' field") 203 } 204 log.Printf("[POST-CREATE] Client provided valid thumbnail blob") 205 } else { 206 return nil, NewValidationError("thumb", 207 fmt.Sprintf("thumb must be a blob object, got: %T", existingThumb)) 208 } 209 } 210 211 // TRUSTED AGGREGATOR: Allow Kagi aggregator to provide thumbnail URLs directly 212 // This bypasses unfurl for more accurate RSS-sourced thumbnails 213 if req.ThumbnailURL != nil && *req.ThumbnailURL != "" && isTrustedKagi { 214 log.Printf("[AGGREGATOR-THUMB] Trusted aggregator provided thumbnail: %s", *req.ThumbnailURL) 215 216 if s.blobService != nil { 217 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second) 218 defer blobCancel() 219 220 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, *req.ThumbnailURL) 221 if blobErr != nil { 222 log.Printf("[AGGREGATOR-THUMB] Failed to upload thumbnail: %v", blobErr) 223 // No fallback - aggregators only use RSS feed thumbnails 224 } else { 225 external["thumb"] = blob 226 log.Printf("[AGGREGATOR-THUMB] Successfully uploaded thumbnail from trusted aggregator") 227 } 228 } 229 } 230 231 // Unfurl enhancement (optional, only if URL is supported) 232 // Skip unfurl for trusted aggregators - they provide their own metadata 233 if !isTrustedKagi { 234 if uri, ok := external["uri"].(string); ok && uri != "" { 235 // Check if we support unfurling this URL 236 if s.unfurlService != nil && s.unfurlService.IsSupported(uri) { 237 log.Printf("[POST-CREATE] Unfurling URL: %s", uri) 238 239 // Unfurl with timeout (non-fatal if it fails) 240 unfurlCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 241 defer cancel() 242 243 result, err := s.unfurlService.UnfurlURL(unfurlCtx, uri) 244 if err != nil { 245 // Log but don't fail - user can still post with manual metadata 246 log.Printf("[POST-CREATE] Warning: Failed to unfurl URL %s: %v", uri, err) 247 } else { 248 // Enhance embed with fetched metadata (only if client didn't provide) 249 // Note: We respect client-provided values, even empty strings 250 // If client sends title="", we assume they want no title 251 if external["title"] == nil { 252 external["title"] = result.Title 253 } 254 if external["description"] == nil { 255 external["description"] = result.Description 256 } 257 // Always set metadata fields (provider, domain, type) 258 external["embedType"] = result.Type 259 external["provider"] = result.Provider 260 external["domain"] = result.Domain 261 262 // Upload thumbnail from unfurl if client didn't provide one 263 // (Thumb validation already happened above) 264 if external["thumb"] == nil { 265 if result.ThumbnailURL != "" && s.blobService != nil { 266 blobCtx, blobCancel := context.WithTimeout(ctx, 15*time.Second) 267 defer blobCancel() 268 269 blob, blobErr := s.blobService.UploadBlobFromURL(blobCtx, community, result.ThumbnailURL) 270 if blobErr != nil { 271 log.Printf("[POST-CREATE] Warning: Failed to upload thumbnail for %s: %v", uri, blobErr) 272 } else { 273 external["thumb"] = blob 274 log.Printf("[POST-CREATE] Uploaded thumbnail blob for %s", uri) 275 } 276 } 277 } 278 279 log.Printf("[POST-CREATE] Successfully enhanced embed with unfurl data (provider: %s, type: %s)", 280 result.Provider, result.Type) 281 } 282 } 283 } 284 } 285 } 286 } 287 } 288 289 // 11. Write to community's PDS repository 290 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord) 291 if err != nil { 292 return nil, fmt.Errorf("failed to write post to PDS: %w", err) 293 } 294 295 // 12. Record aggregator post for rate limiting (non-Kagi aggregators only) 296 // Kagi is exempted from rate limiting via env var (temporary) 297 if isOtherAggregator && s.aggregatorService != nil { 298 if recordErr := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); recordErr != nil { 299 // Log but don't fail - post was already created successfully 300 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", recordErr) 301 } 302 } 303 304 // 13. Return response (AppView will index via Jetstream consumer) 305 log.Printf("[POST-CREATE] Author: %s (trustedKagi=%v, otherAggregator=%v), Community: %s, URI: %s", 306 req.AuthorDID, isTrustedKagi, isOtherAggregator, communityDID, uri) 307 308 return &CreatePostResponse{ 309 URI: uri, 310 CID: cid, 311 }, nil 312} 313 314// validateCreateRequest validates basic input requirements 315func (s *postService) validateCreateRequest(req CreatePostRequest) error { 316 // Global content limits (from lexicon) 317 const ( 318 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon 319 maxTitleLength = 3000 // 3k bytes 320 maxTitleGraphemes = 300 // 300 graphemes (simplified check) 321 ) 322 323 // Validate community required 324 if req.Community == "" { 325 return NewValidationError("community", "community is required") 326 } 327 328 // Validate author DID set by handler 329 if req.AuthorDID == "" { 330 return NewValidationError("authorDid", "authorDid must be set from authenticated user") 331 } 332 333 // Validate content length 334 if req.Content != nil && len(*req.Content) > maxContentLength { 335 return NewValidationError("content", 336 fmt.Sprintf("content too long (max %d characters)", maxContentLength)) 337 } 338 339 // Validate title length 340 if req.Title != nil { 341 if len(*req.Title) > maxTitleLength { 342 return NewValidationError("title", 343 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength)) 344 } 345 // Simplified grapheme check (actual implementation would need unicode library) 346 // For Alpha, byte length check is sufficient 347 } 348 349 // Validate content labels are from known values 350 if req.Labels != nil { 351 validLabels := map[string]bool{ 352 "nsfw": true, 353 "spoiler": true, 354 "violence": true, 355 } 356 for _, label := range req.Labels.Values { 357 if !validLabels[label.Val] { 358 return NewValidationError("labels", 359 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val)) 360 } 361 } 362 } 363 364 return nil 365} 366 367// createPostOnPDS writes a post record to the community's PDS repository 368// Uses com.atproto.repo.createRecord endpoint 369func (s *postService) createPostOnPDS( 370 ctx context.Context, 371 community *communities.Community, 372 record PostRecord, 373) (uri, cid string, err error) { 374 // Use community's PDS URL (not service default) for federated communities 375 // Each community can be hosted on a different PDS instance 376 pdsURL := community.PDSURL 377 if pdsURL == "" { 378 // Fallback to service default if community doesn't have a PDS URL 379 // (shouldn't happen in practice, but safe default) 380 pdsURL = s.pdsURL 381 } 382 383 // Build PDS endpoint URL 384 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL) 385 386 // Build request payload 387 // IMPORTANT: repo is set to community DID, not author DID 388 // This writes the post to the community's repository 389 payload := map[string]interface{}{ 390 "repo": community.DID, // Community's repository 391 "collection": "social.coves.community.post", // Collection type 392 "record": record, // The post record 393 // "rkey" omitted - PDS will auto-generate TID 394 } 395 396 // Marshal payload 397 jsonData, err := json.Marshal(payload) 398 if err != nil { 399 return "", "", fmt.Errorf("failed to marshal post payload: %w", err) 400 } 401 402 // Create HTTP request 403 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData)) 404 if err != nil { 405 return "", "", fmt.Errorf("failed to create PDS request: %w", err) 406 } 407 408 // Set headers (auth + content type) 409 req.Header.Set("Content-Type", "application/json") 410 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken) 411 412 // Extended timeout for write operations (30 seconds) 413 client := &http.Client{ 414 Timeout: 30 * time.Second, 415 } 416 417 // Execute request 418 resp, err := client.Do(req) 419 if err != nil { 420 return "", "", fmt.Errorf("PDS request failed: %w", err) 421 } 422 defer func() { 423 if closeErr := resp.Body.Close(); closeErr != nil { 424 log.Printf("Warning: failed to close response body: %v", closeErr) 425 } 426 }() 427 428 // Read response body 429 body, err := io.ReadAll(resp.Body) 430 if err != nil { 431 return "", "", fmt.Errorf("failed to read PDS response: %w", err) 432 } 433 434 // Check for errors 435 if resp.StatusCode != http.StatusOK { 436 // Sanitize error body for logging (prevent sensitive data leakage) 437 bodyPreview := string(body) 438 if len(bodyPreview) > 200 { 439 bodyPreview = bodyPreview[:200] + "... (truncated)" 440 } 441 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview) 442 443 // Return truncated error (defense in depth - handler will mask this further) 444 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview) 445 } 446 447 // Parse response 448 var result struct { 449 URI string `json:"uri"` 450 CID string `json:"cid"` 451 } 452 if err := json.Unmarshal(body, &result); err != nil { 453 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 454 } 455 456 return result.URI, result.CID, nil 457}