A community based topic aggregation platform built on atproto
1package posts
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/core/aggregators"
6 "Coves/internal/core/communities"
7 "bytes"
8 "context"
9 "encoding/json"
10 "fmt"
11 "io"
12 "log"
13 "net/http"
14 "time"
15)
16
17type postService struct {
18 repo Repository
19 communityService communities.Service
20 aggregatorService aggregators.Service
21 pdsURL string
22}
23
24// NewPostService creates a new post service
25// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups)
26func NewPostService(
27 repo Repository,
28 communityService communities.Service,
29 aggregatorService aggregators.Service, // Optional: can be nil
30 pdsURL string,
31) Service {
32 return &postService{
33 repo: repo,
34 communityService: communityService,
35 aggregatorService: aggregatorService,
36 pdsURL: pdsURL,
37 }
38}
39
40// CreatePost creates a new post in a community
41// Flow:
42// 1. Validate input
43// 2. Check if author is an aggregator (server-side validation using DID from JWT)
44// 3. If aggregator: validate authorization and rate limits, skip membership checks
45// 4. If user: resolve community and perform membership/ban validation
46// 5. Build post record
47// 6. Write to community's PDS repository
48// 7. If aggregator: record post for rate limiting
49// 8. Return URI/CID (AppView indexes asynchronously via Jetstream)
50func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
51 // 1. SECURITY: Extract authenticated DID from context (set by JWT middleware)
52 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed
53 authenticatedDID := middleware.GetAuthenticatedDID(ctx)
54 if authenticatedDID == "" {
55 return nil, fmt.Errorf("no authenticated DID in context - authentication required")
56 }
57
58 // SECURITY: Verify request DID matches authenticated DID from JWT
59 // This prevents DID spoofing where a malicious client or compromised handler
60 // could provide a different DID than what was authenticated
61 if authenticatedDID != req.AuthorDID {
62 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID)
63 return nil, fmt.Errorf("authenticated DID does not match author DID")
64 }
65
66 // 2. Validate basic input
67 if err := s.validateCreateRequest(req); err != nil {
68 return nil, err
69 }
70
71 // 3. SECURITY: Check if the authenticated DID is a registered aggregator
72 // This is server-side verification - we query the database to confirm
73 // the DID from the JWT corresponds to a registered aggregator service
74 // If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts
75 isAggregator := false
76 if s.aggregatorService != nil {
77 var err error
78 isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID)
79 if err != nil {
80 return nil, fmt.Errorf("failed to check if author is aggregator: %w", err)
81 }
82 }
83
84 // 4. Resolve community at-identifier (handle or DID) to DID
85 // This accepts both formats per atProto best practices:
86 // - Handles: !gardening.communities.coves.social
87 // - DIDs: did:plc:abc123 or did:web:coves.social
88 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community)
89 if err != nil {
90 // Handle specific error types appropriately
91 if communities.IsNotFound(err) {
92 return nil, ErrCommunityNotFound
93 }
94 if communities.IsValidationError(err) {
95 // Pass through validation errors (invalid format, etc.)
96 return nil, NewValidationError("community", err.Error())
97 }
98 // Infrastructure failures (DB errors, network issues) should be internal errors
99 // Don't leak internal details to client (e.g., "pq: connection refused")
100 return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
101 }
102
103 // 5. Fetch community from AppView (includes all metadata)
104 community, err := s.communityService.GetByDID(ctx, communityDID)
105 if err != nil {
106 if communities.IsNotFound(err) {
107 return nil, ErrCommunityNotFound
108 }
109 return nil, fmt.Errorf("failed to fetch community: %w", err)
110 }
111
112 // 6. Apply validation based on actor type (aggregator vs user)
113 if isAggregator {
114 // AGGREGATOR VALIDATION FLOW
115 // Following Bluesky's pattern: feed generators and labelers are authorized services
116 log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
117
118 // Check authorization exists and is enabled, and verify rate limits
119 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil {
120 if aggregators.IsUnauthorized(err) {
121 return nil, ErrNotAuthorized
122 }
123 if aggregators.IsRateLimited(err) {
124 return nil, ErrRateLimitExceeded
125 }
126 return nil, fmt.Errorf("aggregator validation failed: %w", err)
127 }
128
129 // Aggregators skip membership checks and visibility restrictions
130 // They are authorized services, not community members
131 } else {
132 // USER VALIDATION FLOW
133 // Check community visibility (Alpha: public/unlisted only)
134 // Beta will add membership checks for private communities
135 if community.Visibility == "private" {
136 return nil, ErrNotAuthorized
137 }
138 }
139
140 // 7. Ensure community has fresh PDS credentials (token refresh if needed)
141 community, err = s.communityService.EnsureFreshToken(ctx, community)
142 if err != nil {
143 return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
144 }
145
146 // 8. Build post record for PDS
147 postRecord := PostRecord{
148 Type: "social.coves.community.post",
149 Community: communityDID,
150 Author: req.AuthorDID,
151 Title: req.Title,
152 Content: req.Content,
153 Facets: req.Facets,
154 Embed: req.Embed,
155 Labels: req.Labels,
156 OriginalAuthor: req.OriginalAuthor,
157 FederatedFrom: req.FederatedFrom,
158 Location: req.Location,
159 CreatedAt: time.Now().UTC().Format(time.RFC3339),
160 }
161
162 // 9. Write to community's PDS repository
163 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
164 if err != nil {
165 return nil, fmt.Errorf("failed to write post to PDS: %w", err)
166 }
167
168 // 10. If aggregator, record post for rate limiting and statistics
169 if isAggregator && s.aggregatorService != nil {
170 if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil {
171 // Log error but don't fail the request (post was already created on PDS)
172 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err)
173 }
174 }
175
176 // 11. Return response (AppView will index via Jetstream consumer)
177 log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s",
178 req.AuthorDID, isAggregator, communityDID, uri)
179
180 return &CreatePostResponse{
181 URI: uri,
182 CID: cid,
183 }, nil
184}
185
186// validateCreateRequest validates basic input requirements
187func (s *postService) validateCreateRequest(req CreatePostRequest) error {
188 // Global content limits (from lexicon)
189 const (
190 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon
191 maxTitleLength = 3000 // 3k bytes
192 maxTitleGraphemes = 300 // 300 graphemes (simplified check)
193 )
194
195 // Validate community required
196 if req.Community == "" {
197 return NewValidationError("community", "community is required")
198 }
199
200 // Validate author DID set by handler
201 if req.AuthorDID == "" {
202 return NewValidationError("authorDid", "authorDid must be set from authenticated user")
203 }
204
205 // Validate content length
206 if req.Content != nil && len(*req.Content) > maxContentLength {
207 return NewValidationError("content",
208 fmt.Sprintf("content too long (max %d characters)", maxContentLength))
209 }
210
211 // Validate title length
212 if req.Title != nil {
213 if len(*req.Title) > maxTitleLength {
214 return NewValidationError("title",
215 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength))
216 }
217 // Simplified grapheme check (actual implementation would need unicode library)
218 // For Alpha, byte length check is sufficient
219 }
220
221 // Validate content labels are from known values
222 if req.Labels != nil {
223 validLabels := map[string]bool{
224 "nsfw": true,
225 "spoiler": true,
226 "violence": true,
227 }
228 for _, label := range req.Labels.Values {
229 if !validLabels[label.Val] {
230 return NewValidationError("labels",
231 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val))
232 }
233 }
234 }
235
236 return nil
237}
238
239// createPostOnPDS writes a post record to the community's PDS repository
240// Uses com.atproto.repo.createRecord endpoint
241func (s *postService) createPostOnPDS(
242 ctx context.Context,
243 community *communities.Community,
244 record PostRecord,
245) (uri, cid string, err error) {
246 // Use community's PDS URL (not service default) for federated communities
247 // Each community can be hosted on a different PDS instance
248 pdsURL := community.PDSURL
249 if pdsURL == "" {
250 // Fallback to service default if community doesn't have a PDS URL
251 // (shouldn't happen in practice, but safe default)
252 pdsURL = s.pdsURL
253 }
254
255 // Build PDS endpoint URL
256 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL)
257
258 // Build request payload
259 // IMPORTANT: repo is set to community DID, not author DID
260 // This writes the post to the community's repository
261 payload := map[string]interface{}{
262 "repo": community.DID, // Community's repository
263 "collection": "social.coves.community.post", // Collection type
264 "record": record, // The post record
265 // "rkey" omitted - PDS will auto-generate TID
266 }
267
268 // Marshal payload
269 jsonData, err := json.Marshal(payload)
270 if err != nil {
271 return "", "", fmt.Errorf("failed to marshal post payload: %w", err)
272 }
273
274 // Create HTTP request
275 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData))
276 if err != nil {
277 return "", "", fmt.Errorf("failed to create PDS request: %w", err)
278 }
279
280 // Set headers (auth + content type)
281 req.Header.Set("Content-Type", "application/json")
282 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken)
283
284 // Extended timeout for write operations (30 seconds)
285 client := &http.Client{
286 Timeout: 30 * time.Second,
287 }
288
289 // Execute request
290 resp, err := client.Do(req)
291 if err != nil {
292 return "", "", fmt.Errorf("PDS request failed: %w", err)
293 }
294 defer func() {
295 if closeErr := resp.Body.Close(); closeErr != nil {
296 log.Printf("Warning: failed to close response body: %v", closeErr)
297 }
298 }()
299
300 // Read response body
301 body, err := io.ReadAll(resp.Body)
302 if err != nil {
303 return "", "", fmt.Errorf("failed to read PDS response: %w", err)
304 }
305
306 // Check for errors
307 if resp.StatusCode != http.StatusOK {
308 // Sanitize error body for logging (prevent sensitive data leakage)
309 bodyPreview := string(body)
310 if len(bodyPreview) > 200 {
311 bodyPreview = bodyPreview[:200] + "... (truncated)"
312 }
313 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview)
314
315 // Return truncated error (defense in depth - handler will mask this further)
316 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview)
317 }
318
319 // Parse response
320 var result struct {
321 URI string `json:"uri"`
322 CID string `json:"cid"`
323 }
324 if err := json.Unmarshal(body, &result); err != nil {
325 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
326 }
327
328 return result.URI, result.CID, nil
329}