A community based topic aggregation platform built on atproto
1package posts 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/core/aggregators" 6 "Coves/internal/core/communities" 7 "bytes" 8 "context" 9 "encoding/json" 10 "fmt" 11 "io" 12 "log" 13 "net/http" 14 "time" 15) 16 17type postService struct { 18 repo Repository 19 communityService communities.Service 20 aggregatorService aggregators.Service 21 pdsURL string 22} 23 24// NewPostService creates a new post service 25// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups) 26func NewPostService( 27 repo Repository, 28 communityService communities.Service, 29 aggregatorService aggregators.Service, // Optional: can be nil 30 pdsURL string, 31) Service { 32 return &postService{ 33 repo: repo, 34 communityService: communityService, 35 aggregatorService: aggregatorService, 36 pdsURL: pdsURL, 37 } 38} 39 40// CreatePost creates a new post in a community 41// Flow: 42// 1. Validate input 43// 2. Check if author is an aggregator (server-side validation using DID from JWT) 44// 3. If aggregator: validate authorization and rate limits, skip membership checks 45// 4. If user: resolve community and perform membership/ban validation 46// 5. Build post record 47// 6. Write to community's PDS repository 48// 7. If aggregator: record post for rate limiting 49// 8. Return URI/CID (AppView indexes asynchronously via Jetstream) 50func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) { 51 // 1. SECURITY: Extract authenticated DID from context (set by JWT middleware) 52 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed 53 authenticatedDID := middleware.GetAuthenticatedDID(ctx) 54 if authenticatedDID == "" { 55 return nil, fmt.Errorf("no authenticated DID in context - authentication required") 56 } 57 58 // SECURITY: Verify request DID matches authenticated DID from JWT 59 // This prevents DID spoofing where a malicious client or compromised handler 60 // could provide a different DID than what was authenticated 61 if authenticatedDID != req.AuthorDID { 62 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID) 63 return nil, fmt.Errorf("authenticated DID does not match author DID") 64 } 65 66 // 2. Validate basic input 67 if err := s.validateCreateRequest(req); err != nil { 68 return nil, err 69 } 70 71 // 3. SECURITY: Check if the authenticated DID is a registered aggregator 72 // This is server-side verification - we query the database to confirm 73 // the DID from the JWT corresponds to a registered aggregator service 74 // If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts 75 isAggregator := false 76 if s.aggregatorService != nil { 77 var err error 78 isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID) 79 if err != nil { 80 return nil, fmt.Errorf("failed to check if author is aggregator: %w", err) 81 } 82 } 83 84 // 4. Resolve community at-identifier (handle or DID) to DID 85 // This accepts both formats per atProto best practices: 86 // - Handles: !gardening.communities.coves.social 87 // - DIDs: did:plc:abc123 or did:web:coves.social 88 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community) 89 if err != nil { 90 // Handle specific error types appropriately 91 if communities.IsNotFound(err) { 92 return nil, ErrCommunityNotFound 93 } 94 if communities.IsValidationError(err) { 95 // Pass through validation errors (invalid format, etc.) 96 return nil, NewValidationError("community", err.Error()) 97 } 98 // Infrastructure failures (DB errors, network issues) should be internal errors 99 // Don't leak internal details to client (e.g., "pq: connection refused") 100 return nil, fmt.Errorf("failed to resolve community identifier: %w", err) 101 } 102 103 // 5. Fetch community from AppView (includes all metadata) 104 community, err := s.communityService.GetByDID(ctx, communityDID) 105 if err != nil { 106 if communities.IsNotFound(err) { 107 return nil, ErrCommunityNotFound 108 } 109 return nil, fmt.Errorf("failed to fetch community: %w", err) 110 } 111 112 // 6. Apply validation based on actor type (aggregator vs user) 113 if isAggregator { 114 // AGGREGATOR VALIDATION FLOW 115 // Following Bluesky's pattern: feed generators and labelers are authorized services 116 log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 117 118 // Check authorization exists and is enabled, and verify rate limits 119 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil { 120 if aggregators.IsUnauthorized(err) { 121 return nil, ErrNotAuthorized 122 } 123 if aggregators.IsRateLimited(err) { 124 return nil, ErrRateLimitExceeded 125 } 126 return nil, fmt.Errorf("aggregator validation failed: %w", err) 127 } 128 129 // Aggregators skip membership checks and visibility restrictions 130 // They are authorized services, not community members 131 } else { 132 // USER VALIDATION FLOW 133 // Check community visibility (Alpha: public/unlisted only) 134 // Beta will add membership checks for private communities 135 if community.Visibility == "private" { 136 return nil, ErrNotAuthorized 137 } 138 } 139 140 // 7. Ensure community has fresh PDS credentials (token refresh if needed) 141 community, err = s.communityService.EnsureFreshToken(ctx, community) 142 if err != nil { 143 return nil, fmt.Errorf("failed to refresh community credentials: %w", err) 144 } 145 146 // 8. Build post record for PDS 147 postRecord := PostRecord{ 148 Type: "social.coves.post.record", 149 Community: communityDID, 150 Author: req.AuthorDID, 151 Title: req.Title, 152 Content: req.Content, 153 Facets: req.Facets, 154 Embed: req.Embed, 155 ContentLabels: req.ContentLabels, 156 OriginalAuthor: req.OriginalAuthor, 157 FederatedFrom: req.FederatedFrom, 158 Location: req.Location, 159 CreatedAt: time.Now().UTC().Format(time.RFC3339), 160 } 161 162 // 9. Write to community's PDS repository 163 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord) 164 if err != nil { 165 return nil, fmt.Errorf("failed to write post to PDS: %w", err) 166 } 167 168 // 10. If aggregator, record post for rate limiting and statistics 169 if isAggregator && s.aggregatorService != nil { 170 if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil { 171 // Log error but don't fail the request (post was already created on PDS) 172 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err) 173 } 174 } 175 176 // 11. Return response (AppView will index via Jetstream consumer) 177 log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s", 178 req.AuthorDID, isAggregator, communityDID, uri) 179 180 return &CreatePostResponse{ 181 URI: uri, 182 CID: cid, 183 }, nil 184} 185 186// validateCreateRequest validates basic input requirements 187func (s *postService) validateCreateRequest(req CreatePostRequest) error { 188 // Global content limits (from lexicon) 189 const ( 190 maxContentLength = 50000 // 50k characters 191 maxTitleLength = 3000 // 3k bytes 192 maxTitleGraphemes = 300 // 300 graphemes (simplified check) 193 ) 194 195 // Validate community required 196 if req.Community == "" { 197 return NewValidationError("community", "community is required") 198 } 199 200 // Validate author DID set by handler 201 if req.AuthorDID == "" { 202 return NewValidationError("authorDid", "authorDid must be set from authenticated user") 203 } 204 205 // Validate content length 206 if req.Content != nil && len(*req.Content) > maxContentLength { 207 return NewValidationError("content", 208 fmt.Sprintf("content too long (max %d characters)", maxContentLength)) 209 } 210 211 // Validate title length 212 if req.Title != nil { 213 if len(*req.Title) > maxTitleLength { 214 return NewValidationError("title", 215 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength)) 216 } 217 // Simplified grapheme check (actual implementation would need unicode library) 218 // For Alpha, byte length check is sufficient 219 } 220 221 // Validate content labels are from known values 222 validLabels := map[string]bool{ 223 "nsfw": true, 224 "spoiler": true, 225 "violence": true, 226 } 227 for _, label := range req.ContentLabels { 228 if !validLabels[label] { 229 return NewValidationError("contentLabels", 230 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label)) 231 } 232 } 233 234 return nil 235} 236 237// createPostOnPDS writes a post record to the community's PDS repository 238// Uses com.atproto.repo.createRecord endpoint 239func (s *postService) createPostOnPDS( 240 ctx context.Context, 241 community *communities.Community, 242 record PostRecord, 243) (uri, cid string, err error) { 244 // Use community's PDS URL (not service default) for federated communities 245 // Each community can be hosted on a different PDS instance 246 pdsURL := community.PDSURL 247 if pdsURL == "" { 248 // Fallback to service default if community doesn't have a PDS URL 249 // (shouldn't happen in practice, but safe default) 250 pdsURL = s.pdsURL 251 } 252 253 // Build PDS endpoint URL 254 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL) 255 256 // Build request payload 257 // IMPORTANT: repo is set to community DID, not author DID 258 // This writes the post to the community's repository 259 payload := map[string]interface{}{ 260 "repo": community.DID, // Community's repository 261 "collection": "social.coves.post.record", // Collection type 262 "record": record, // The post record 263 // "rkey" omitted - PDS will auto-generate TID 264 } 265 266 // Marshal payload 267 jsonData, err := json.Marshal(payload) 268 if err != nil { 269 return "", "", fmt.Errorf("failed to marshal post payload: %w", err) 270 } 271 272 // Create HTTP request 273 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData)) 274 if err != nil { 275 return "", "", fmt.Errorf("failed to create PDS request: %w", err) 276 } 277 278 // Set headers (auth + content type) 279 req.Header.Set("Content-Type", "application/json") 280 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken) 281 282 // Extended timeout for write operations (30 seconds) 283 client := &http.Client{ 284 Timeout: 30 * time.Second, 285 } 286 287 // Execute request 288 resp, err := client.Do(req) 289 if err != nil { 290 return "", "", fmt.Errorf("PDS request failed: %w", err) 291 } 292 defer func() { 293 if closeErr := resp.Body.Close(); closeErr != nil { 294 log.Printf("Warning: failed to close response body: %v", closeErr) 295 } 296 }() 297 298 // Read response body 299 body, err := io.ReadAll(resp.Body) 300 if err != nil { 301 return "", "", fmt.Errorf("failed to read PDS response: %w", err) 302 } 303 304 // Check for errors 305 if resp.StatusCode != http.StatusOK { 306 // Sanitize error body for logging (prevent sensitive data leakage) 307 bodyPreview := string(body) 308 if len(bodyPreview) > 200 { 309 bodyPreview = bodyPreview[:200] + "... (truncated)" 310 } 311 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview) 312 313 // Return truncated error (defense in depth - handler will mask this further) 314 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview) 315 } 316 317 // Parse response 318 var result struct { 319 URI string `json:"uri"` 320 CID string `json:"cid"` 321 } 322 if err := json.Unmarshal(body, &result); err != nil { 323 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 324 } 325 326 return result.URI, result.CID, nil 327}