A community based topic aggregation platform built on atproto
1package posts 2 3import ( 4 "Coves/internal/api/middleware" 5 "Coves/internal/core/aggregators" 6 "Coves/internal/core/communities" 7 "bytes" 8 "context" 9 "encoding/json" 10 "fmt" 11 "io" 12 "log" 13 "net/http" 14 "time" 15) 16 17type postService struct { 18 repo Repository 19 communityService communities.Service 20 aggregatorService aggregators.Service 21 pdsURL string 22} 23 24// NewPostService creates a new post service 25// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups) 26func NewPostService( 27 repo Repository, 28 communityService communities.Service, 29 aggregatorService aggregators.Service, // Optional: can be nil 30 pdsURL string, 31) Service { 32 return &postService{ 33 repo: repo, 34 communityService: communityService, 35 aggregatorService: aggregatorService, 36 pdsURL: pdsURL, 37 } 38} 39 40// CreatePost creates a new post in a community 41// Flow: 42// 1. Validate input 43// 2. Check if author is an aggregator (server-side validation using DID from JWT) 44// 3. If aggregator: validate authorization and rate limits, skip membership checks 45// 4. If user: resolve community and perform membership/ban validation 46// 5. Build post record 47// 6. Write to community's PDS repository 48// 7. If aggregator: record post for rate limiting 49// 8. Return URI/CID (AppView indexes asynchronously via Jetstream) 50func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) { 51 // 1. SECURITY: Extract authenticated DID from context (set by JWT middleware) 52 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed 53 authenticatedDID := middleware.GetAuthenticatedDID(ctx) 54 if authenticatedDID == "" { 55 return nil, fmt.Errorf("no authenticated DID in context - authentication required") 56 } 57 58 // SECURITY: Verify request DID matches authenticated DID from JWT 59 // This prevents DID spoofing where a malicious client or compromised handler 60 // could provide a different DID than what was authenticated 61 if authenticatedDID != req.AuthorDID { 62 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID) 63 return nil, fmt.Errorf("authenticated DID does not match author DID") 64 } 65 66 // 2. Validate basic input 67 if err := s.validateCreateRequest(req); err != nil { 68 return nil, err 69 } 70 71 // 3. SECURITY: Check if the authenticated DID is a registered aggregator 72 // This is server-side verification - we query the database to confirm 73 // the DID from the JWT corresponds to a registered aggregator service 74 // If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts 75 isAggregator := false 76 if s.aggregatorService != nil { 77 var err error 78 isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID) 79 if err != nil { 80 return nil, fmt.Errorf("failed to check if author is aggregator: %w", err) 81 } 82 } 83 84 // 4. Resolve community at-identifier (handle or DID) to DID 85 // This accepts both formats per atProto best practices: 86 // - Handles: !gardening.communities.coves.social 87 // - DIDs: did:plc:abc123 or did:web:coves.social 88 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community) 89 if err != nil { 90 // Handle specific error types appropriately 91 if communities.IsNotFound(err) { 92 return nil, ErrCommunityNotFound 93 } 94 if communities.IsValidationError(err) { 95 // Pass through validation errors (invalid format, etc.) 96 return nil, NewValidationError("community", err.Error()) 97 } 98 // Infrastructure failures (DB errors, network issues) should be internal errors 99 // Don't leak internal details to client (e.g., "pq: connection refused") 100 return nil, fmt.Errorf("failed to resolve community identifier: %w", err) 101 } 102 103 // 5. Fetch community from AppView (includes all metadata) 104 community, err := s.communityService.GetByDID(ctx, communityDID) 105 if err != nil { 106 if communities.IsNotFound(err) { 107 return nil, ErrCommunityNotFound 108 } 109 return nil, fmt.Errorf("failed to fetch community: %w", err) 110 } 111 112 // 6. Apply validation based on actor type (aggregator vs user) 113 if isAggregator { 114 // AGGREGATOR VALIDATION FLOW 115 // Following Bluesky's pattern: feed generators and labelers are authorized services 116 log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 117 118 // Check authorization exists and is enabled, and verify rate limits 119 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil { 120 if aggregators.IsUnauthorized(err) { 121 return nil, ErrNotAuthorized 122 } 123 if aggregators.IsRateLimited(err) { 124 return nil, ErrRateLimitExceeded 125 } 126 return nil, fmt.Errorf("aggregator validation failed: %w", err) 127 } 128 129 // Aggregators skip membership checks and visibility restrictions 130 // They are authorized services, not community members 131 } else { 132 // USER VALIDATION FLOW 133 // Check community visibility (Alpha: public/unlisted only) 134 // Beta will add membership checks for private communities 135 if community.Visibility == "private" { 136 return nil, ErrNotAuthorized 137 } 138 } 139 140 // 7. Ensure community has fresh PDS credentials (token refresh if needed) 141 community, err = s.communityService.EnsureFreshToken(ctx, community) 142 if err != nil { 143 return nil, fmt.Errorf("failed to refresh community credentials: %w", err) 144 } 145 146 // 8. Build post record for PDS 147 postRecord := PostRecord{ 148 Type: "social.coves.community.post", 149 Community: communityDID, 150 Author: req.AuthorDID, 151 Title: req.Title, 152 Content: req.Content, 153 Facets: req.Facets, 154 Embed: req.Embed, 155 Labels: req.Labels, 156 OriginalAuthor: req.OriginalAuthor, 157 FederatedFrom: req.FederatedFrom, 158 Location: req.Location, 159 CreatedAt: time.Now().UTC().Format(time.RFC3339), 160 } 161 162 // 9. Write to community's PDS repository 163 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord) 164 if err != nil { 165 return nil, fmt.Errorf("failed to write post to PDS: %w", err) 166 } 167 168 // 10. If aggregator, record post for rate limiting and statistics 169 if isAggregator && s.aggregatorService != nil { 170 if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil { 171 // Log error but don't fail the request (post was already created on PDS) 172 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err) 173 } 174 } 175 176 // 11. Return response (AppView will index via Jetstream consumer) 177 log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s", 178 req.AuthorDID, isAggregator, communityDID, uri) 179 180 return &CreatePostResponse{ 181 URI: uri, 182 CID: cid, 183 }, nil 184} 185 186// validateCreateRequest validates basic input requirements 187func (s *postService) validateCreateRequest(req CreatePostRequest) error { 188 // Global content limits (from lexicon) 189 const ( 190 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon 191 maxTitleLength = 3000 // 3k bytes 192 maxTitleGraphemes = 300 // 300 graphemes (simplified check) 193 ) 194 195 // Validate community required 196 if req.Community == "" { 197 return NewValidationError("community", "community is required") 198 } 199 200 // Validate author DID set by handler 201 if req.AuthorDID == "" { 202 return NewValidationError("authorDid", "authorDid must be set from authenticated user") 203 } 204 205 // Validate content length 206 if req.Content != nil && len(*req.Content) > maxContentLength { 207 return NewValidationError("content", 208 fmt.Sprintf("content too long (max %d characters)", maxContentLength)) 209 } 210 211 // Validate title length 212 if req.Title != nil { 213 if len(*req.Title) > maxTitleLength { 214 return NewValidationError("title", 215 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength)) 216 } 217 // Simplified grapheme check (actual implementation would need unicode library) 218 // For Alpha, byte length check is sufficient 219 } 220 221 // Validate content labels are from known values 222 if req.Labels != nil { 223 validLabels := map[string]bool{ 224 "nsfw": true, 225 "spoiler": true, 226 "violence": true, 227 } 228 for _, label := range req.Labels.Values { 229 if !validLabels[label.Val] { 230 return NewValidationError("labels", 231 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val)) 232 } 233 } 234 } 235 236 return nil 237} 238 239// createPostOnPDS writes a post record to the community's PDS repository 240// Uses com.atproto.repo.createRecord endpoint 241func (s *postService) createPostOnPDS( 242 ctx context.Context, 243 community *communities.Community, 244 record PostRecord, 245) (uri, cid string, err error) { 246 // Use community's PDS URL (not service default) for federated communities 247 // Each community can be hosted on a different PDS instance 248 pdsURL := community.PDSURL 249 if pdsURL == "" { 250 // Fallback to service default if community doesn't have a PDS URL 251 // (shouldn't happen in practice, but safe default) 252 pdsURL = s.pdsURL 253 } 254 255 // Build PDS endpoint URL 256 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL) 257 258 // Build request payload 259 // IMPORTANT: repo is set to community DID, not author DID 260 // This writes the post to the community's repository 261 payload := map[string]interface{}{ 262 "repo": community.DID, // Community's repository 263 "collection": "social.coves.community.post", // Collection type 264 "record": record, // The post record 265 // "rkey" omitted - PDS will auto-generate TID 266 } 267 268 // Marshal payload 269 jsonData, err := json.Marshal(payload) 270 if err != nil { 271 return "", "", fmt.Errorf("failed to marshal post payload: %w", err) 272 } 273 274 // Create HTTP request 275 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData)) 276 if err != nil { 277 return "", "", fmt.Errorf("failed to create PDS request: %w", err) 278 } 279 280 // Set headers (auth + content type) 281 req.Header.Set("Content-Type", "application/json") 282 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken) 283 284 // Extended timeout for write operations (30 seconds) 285 client := &http.Client{ 286 Timeout: 30 * time.Second, 287 } 288 289 // Execute request 290 resp, err := client.Do(req) 291 if err != nil { 292 return "", "", fmt.Errorf("PDS request failed: %w", err) 293 } 294 defer func() { 295 if closeErr := resp.Body.Close(); closeErr != nil { 296 log.Printf("Warning: failed to close response body: %v", closeErr) 297 } 298 }() 299 300 // Read response body 301 body, err := io.ReadAll(resp.Body) 302 if err != nil { 303 return "", "", fmt.Errorf("failed to read PDS response: %w", err) 304 } 305 306 // Check for errors 307 if resp.StatusCode != http.StatusOK { 308 // Sanitize error body for logging (prevent sensitive data leakage) 309 bodyPreview := string(body) 310 if len(bodyPreview) > 200 { 311 bodyPreview = bodyPreview[:200] + "... (truncated)" 312 } 313 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview) 314 315 // Return truncated error (defense in depth - handler will mask this further) 316 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview) 317 } 318 319 // Parse response 320 var result struct { 321 URI string `json:"uri"` 322 CID string `json:"cid"` 323 } 324 if err := json.Unmarshal(body, &result); err != nil { 325 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 326 } 327 328 return result.URI, result.CID, nil 329}