A community based topic aggregation platform built on atproto
1package posts 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "net/http" 11 "time" 12 13 "Coves/internal/api/middleware" 14 "Coves/internal/core/aggregators" 15 "Coves/internal/core/communities" 16) 17 18type postService struct { 19 repo Repository 20 communityService communities.Service 21 aggregatorService aggregators.Service 22 pdsURL string 23} 24 25// NewPostService creates a new post service 26// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups) 27func NewPostService( 28 repo Repository, 29 communityService communities.Service, 30 aggregatorService aggregators.Service, // Optional: can be nil 31 pdsURL string, 32) Service { 33 return &postService{ 34 repo: repo, 35 communityService: communityService, 36 aggregatorService: aggregatorService, 37 pdsURL: pdsURL, 38 } 39} 40 41// CreatePost creates a new post in a community 42// Flow: 43// 1. Validate input 44// 2. Check if author is an aggregator (server-side validation using DID from JWT) 45// 3. If aggregator: validate authorization and rate limits, skip membership checks 46// 4. If user: resolve community and perform membership/ban validation 47// 5. Build post record 48// 6. Write to community's PDS repository 49// 7. If aggregator: record post for rate limiting 50// 8. Return URI/CID (AppView indexes asynchronously via Jetstream) 51func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) { 52 // 1. SECURITY: Extract authenticated DID from context (set by JWT middleware) 53 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed 54 authenticatedDID := middleware.GetAuthenticatedDID(ctx) 55 if authenticatedDID == "" { 56 return nil, fmt.Errorf("no authenticated DID in context - authentication required") 57 } 58 59 // SECURITY: Verify request DID matches authenticated DID from JWT 60 // This prevents DID spoofing where a malicious client or compromised handler 61 // could provide a different DID than what was authenticated 62 if authenticatedDID != req.AuthorDID { 63 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID) 64 return nil, fmt.Errorf("authenticated DID does not match author DID") 65 } 66 67 // 2. Validate basic input 68 if err := s.validateCreateRequest(req); err != nil { 69 return nil, err 70 } 71 72 // 3. SECURITY: Check if the authenticated DID is a registered aggregator 73 // This is server-side verification - we query the database to confirm 74 // the DID from the JWT corresponds to a registered aggregator service 75 // If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts 76 isAggregator := false 77 if s.aggregatorService != nil { 78 var err error 79 isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID) 80 if err != nil { 81 return nil, fmt.Errorf("failed to check if author is aggregator: %w", err) 82 } 83 } 84 85 // 4. Resolve community at-identifier (handle or DID) to DID 86 // This accepts both formats per atProto best practices: 87 // - Handles: !gardening.communities.coves.social 88 // - DIDs: did:plc:abc123 or did:web:coves.social 89 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community) 90 if err != nil { 91 // Handle specific error types appropriately 92 if communities.IsNotFound(err) { 93 return nil, ErrCommunityNotFound 94 } 95 if communities.IsValidationError(err) { 96 // Pass through validation errors (invalid format, etc.) 97 return nil, NewValidationError("community", err.Error()) 98 } 99 // Infrastructure failures (DB errors, network issues) should be internal errors 100 // Don't leak internal details to client (e.g., "pq: connection refused") 101 return nil, fmt.Errorf("failed to resolve community identifier: %w", err) 102 } 103 104 // 5. Fetch community from AppView (includes all metadata) 105 community, err := s.communityService.GetByDID(ctx, communityDID) 106 if err != nil { 107 if communities.IsNotFound(err) { 108 return nil, ErrCommunityNotFound 109 } 110 return nil, fmt.Errorf("failed to fetch community: %w", err) 111 } 112 113 // 6. Apply validation based on actor type (aggregator vs user) 114 if isAggregator { 115 // AGGREGATOR VALIDATION FLOW 116 // Following Bluesky's pattern: feed generators and labelers are authorized services 117 log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID) 118 119 // Check authorization exists and is enabled, and verify rate limits 120 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil { 121 if aggregators.IsUnauthorized(err) { 122 return nil, ErrNotAuthorized 123 } 124 if aggregators.IsRateLimited(err) { 125 return nil, ErrRateLimitExceeded 126 } 127 return nil, fmt.Errorf("aggregator validation failed: %w", err) 128 } 129 130 // Aggregators skip membership checks and visibility restrictions 131 // They are authorized services, not community members 132 } else { 133 // USER VALIDATION FLOW 134 // Check community visibility (Alpha: public/unlisted only) 135 // Beta will add membership checks for private communities 136 if community.Visibility == "private" { 137 return nil, ErrNotAuthorized 138 } 139 } 140 141 // 7. Ensure community has fresh PDS credentials (token refresh if needed) 142 community, err = s.communityService.EnsureFreshToken(ctx, community) 143 if err != nil { 144 return nil, fmt.Errorf("failed to refresh community credentials: %w", err) 145 } 146 147 // 8. Build post record for PDS 148 postRecord := PostRecord{ 149 Type: "social.coves.community.post", 150 Community: communityDID, 151 Author: req.AuthorDID, 152 Title: req.Title, 153 Content: req.Content, 154 Facets: req.Facets, 155 Embed: req.Embed, 156 Labels: req.Labels, 157 OriginalAuthor: req.OriginalAuthor, 158 FederatedFrom: req.FederatedFrom, 159 Location: req.Location, 160 CreatedAt: time.Now().UTC().Format(time.RFC3339), 161 } 162 163 // 9. Write to community's PDS repository 164 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord) 165 if err != nil { 166 return nil, fmt.Errorf("failed to write post to PDS: %w", err) 167 } 168 169 // 10. If aggregator, record post for rate limiting and statistics 170 if isAggregator && s.aggregatorService != nil { 171 if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil { 172 // Log error but don't fail the request (post was already created on PDS) 173 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err) 174 } 175 } 176 177 // 11. Return response (AppView will index via Jetstream consumer) 178 log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s", 179 req.AuthorDID, isAggregator, communityDID, uri) 180 181 return &CreatePostResponse{ 182 URI: uri, 183 CID: cid, 184 }, nil 185} 186 187// validateCreateRequest validates basic input requirements 188func (s *postService) validateCreateRequest(req CreatePostRequest) error { 189 // Global content limits (from lexicon) 190 const ( 191 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon 192 maxTitleLength = 3000 // 3k bytes 193 maxTitleGraphemes = 300 // 300 graphemes (simplified check) 194 ) 195 196 // Validate community required 197 if req.Community == "" { 198 return NewValidationError("community", "community is required") 199 } 200 201 // Validate author DID set by handler 202 if req.AuthorDID == "" { 203 return NewValidationError("authorDid", "authorDid must be set from authenticated user") 204 } 205 206 // Validate content length 207 if req.Content != nil && len(*req.Content) > maxContentLength { 208 return NewValidationError("content", 209 fmt.Sprintf("content too long (max %d characters)", maxContentLength)) 210 } 211 212 // Validate title length 213 if req.Title != nil { 214 if len(*req.Title) > maxTitleLength { 215 return NewValidationError("title", 216 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength)) 217 } 218 // Simplified grapheme check (actual implementation would need unicode library) 219 // For Alpha, byte length check is sufficient 220 } 221 222 // Validate content labels are from known values 223 if req.Labels != nil { 224 validLabels := map[string]bool{ 225 "nsfw": true, 226 "spoiler": true, 227 "violence": true, 228 } 229 for _, label := range req.Labels.Values { 230 if !validLabels[label.Val] { 231 return NewValidationError("labels", 232 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val)) 233 } 234 } 235 } 236 237 return nil 238} 239 240// createPostOnPDS writes a post record to the community's PDS repository 241// Uses com.atproto.repo.createRecord endpoint 242func (s *postService) createPostOnPDS( 243 ctx context.Context, 244 community *communities.Community, 245 record PostRecord, 246) (uri, cid string, err error) { 247 // Use community's PDS URL (not service default) for federated communities 248 // Each community can be hosted on a different PDS instance 249 pdsURL := community.PDSURL 250 if pdsURL == "" { 251 // Fallback to service default if community doesn't have a PDS URL 252 // (shouldn't happen in practice, but safe default) 253 pdsURL = s.pdsURL 254 } 255 256 // Build PDS endpoint URL 257 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL) 258 259 // Build request payload 260 // IMPORTANT: repo is set to community DID, not author DID 261 // This writes the post to the community's repository 262 payload := map[string]interface{}{ 263 "repo": community.DID, // Community's repository 264 "collection": "social.coves.community.post", // Collection type 265 "record": record, // The post record 266 // "rkey" omitted - PDS will auto-generate TID 267 } 268 269 // Marshal payload 270 jsonData, err := json.Marshal(payload) 271 if err != nil { 272 return "", "", fmt.Errorf("failed to marshal post payload: %w", err) 273 } 274 275 // Create HTTP request 276 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData)) 277 if err != nil { 278 return "", "", fmt.Errorf("failed to create PDS request: %w", err) 279 } 280 281 // Set headers (auth + content type) 282 req.Header.Set("Content-Type", "application/json") 283 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken) 284 285 // Extended timeout for write operations (30 seconds) 286 client := &http.Client{ 287 Timeout: 30 * time.Second, 288 } 289 290 // Execute request 291 resp, err := client.Do(req) 292 if err != nil { 293 return "", "", fmt.Errorf("PDS request failed: %w", err) 294 } 295 defer func() { 296 if closeErr := resp.Body.Close(); closeErr != nil { 297 log.Printf("Warning: failed to close response body: %v", closeErr) 298 } 299 }() 300 301 // Read response body 302 body, err := io.ReadAll(resp.Body) 303 if err != nil { 304 return "", "", fmt.Errorf("failed to read PDS response: %w", err) 305 } 306 307 // Check for errors 308 if resp.StatusCode != http.StatusOK { 309 // Sanitize error body for logging (prevent sensitive data leakage) 310 bodyPreview := string(body) 311 if len(bodyPreview) > 200 { 312 bodyPreview = bodyPreview[:200] + "... (truncated)" 313 } 314 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview) 315 316 // Return truncated error (defense in depth - handler will mask this further) 317 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview) 318 } 319 320 // Parse response 321 var result struct { 322 URI string `json:"uri"` 323 CID string `json:"cid"` 324 } 325 if err := json.Unmarshal(body, &result); err != nil { 326 return "", "", fmt.Errorf("failed to parse PDS response: %w", err) 327 } 328 329 return result.URI, result.CID, nil 330}