A community based topic aggregation platform built on atproto
1package posts
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log"
10 "net/http"
11 "time"
12
13 "Coves/internal/api/middleware"
14 "Coves/internal/core/aggregators"
15 "Coves/internal/core/communities"
16)
17
18type postService struct {
19 repo Repository
20 communityService communities.Service
21 aggregatorService aggregators.Service
22 pdsURL string
23}
24
25// NewPostService creates a new post service
26// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups)
27func NewPostService(
28 repo Repository,
29 communityService communities.Service,
30 aggregatorService aggregators.Service, // Optional: can be nil
31 pdsURL string,
32) Service {
33 return &postService{
34 repo: repo,
35 communityService: communityService,
36 aggregatorService: aggregatorService,
37 pdsURL: pdsURL,
38 }
39}
40
41// CreatePost creates a new post in a community
42// Flow:
43// 1. Validate input
44// 2. Check if author is an aggregator (server-side validation using DID from JWT)
45// 3. If aggregator: validate authorization and rate limits, skip membership checks
46// 4. If user: resolve community and perform membership/ban validation
47// 5. Build post record
48// 6. Write to community's PDS repository
49// 7. If aggregator: record post for rate limiting
50// 8. Return URI/CID (AppView indexes asynchronously via Jetstream)
51func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
52 // 1. SECURITY: Extract authenticated DID from context (set by JWT middleware)
53 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed
54 authenticatedDID := middleware.GetAuthenticatedDID(ctx)
55 if authenticatedDID == "" {
56 return nil, fmt.Errorf("no authenticated DID in context - authentication required")
57 }
58
59 // SECURITY: Verify request DID matches authenticated DID from JWT
60 // This prevents DID spoofing where a malicious client or compromised handler
61 // could provide a different DID than what was authenticated
62 if authenticatedDID != req.AuthorDID {
63 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID)
64 return nil, fmt.Errorf("authenticated DID does not match author DID")
65 }
66
67 // 2. Validate basic input
68 if err := s.validateCreateRequest(req); err != nil {
69 return nil, err
70 }
71
72 // 3. SECURITY: Check if the authenticated DID is a registered aggregator
73 // This is server-side verification - we query the database to confirm
74 // the DID from the JWT corresponds to a registered aggregator service
75 // If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts
76 isAggregator := false
77 if s.aggregatorService != nil {
78 var err error
79 isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID)
80 if err != nil {
81 return nil, fmt.Errorf("failed to check if author is aggregator: %w", err)
82 }
83 }
84
85 // 4. Resolve community at-identifier (handle or DID) to DID
86 // This accepts both formats per atProto best practices:
87 // - Handles: !gardening.communities.coves.social
88 // - DIDs: did:plc:abc123 or did:web:coves.social
89 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community)
90 if err != nil {
91 // Handle specific error types appropriately
92 if communities.IsNotFound(err) {
93 return nil, ErrCommunityNotFound
94 }
95 if communities.IsValidationError(err) {
96 // Pass through validation errors (invalid format, etc.)
97 return nil, NewValidationError("community", err.Error())
98 }
99 // Infrastructure failures (DB errors, network issues) should be internal errors
100 // Don't leak internal details to client (e.g., "pq: connection refused")
101 return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
102 }
103
104 // 5. Fetch community from AppView (includes all metadata)
105 community, err := s.communityService.GetByDID(ctx, communityDID)
106 if err != nil {
107 if communities.IsNotFound(err) {
108 return nil, ErrCommunityNotFound
109 }
110 return nil, fmt.Errorf("failed to fetch community: %w", err)
111 }
112
113 // 6. Apply validation based on actor type (aggregator vs user)
114 if isAggregator {
115 // AGGREGATOR VALIDATION FLOW
116 // Following Bluesky's pattern: feed generators and labelers are authorized services
117 log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
118
119 // Check authorization exists and is enabled, and verify rate limits
120 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil {
121 if aggregators.IsUnauthorized(err) {
122 return nil, ErrNotAuthorized
123 }
124 if aggregators.IsRateLimited(err) {
125 return nil, ErrRateLimitExceeded
126 }
127 return nil, fmt.Errorf("aggregator validation failed: %w", err)
128 }
129
130 // Aggregators skip membership checks and visibility restrictions
131 // They are authorized services, not community members
132 } else {
133 // USER VALIDATION FLOW
134 // Check community visibility (Alpha: public/unlisted only)
135 // Beta will add membership checks for private communities
136 if community.Visibility == "private" {
137 return nil, ErrNotAuthorized
138 }
139 }
140
141 // 7. Ensure community has fresh PDS credentials (token refresh if needed)
142 community, err = s.communityService.EnsureFreshToken(ctx, community)
143 if err != nil {
144 return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
145 }
146
147 // 8. Build post record for PDS
148 postRecord := PostRecord{
149 Type: "social.coves.community.post",
150 Community: communityDID,
151 Author: req.AuthorDID,
152 Title: req.Title,
153 Content: req.Content,
154 Facets: req.Facets,
155 Embed: req.Embed,
156 Labels: req.Labels,
157 OriginalAuthor: req.OriginalAuthor,
158 FederatedFrom: req.FederatedFrom,
159 Location: req.Location,
160 CreatedAt: time.Now().UTC().Format(time.RFC3339),
161 }
162
163 // 9. Write to community's PDS repository
164 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
165 if err != nil {
166 return nil, fmt.Errorf("failed to write post to PDS: %w", err)
167 }
168
169 // 10. If aggregator, record post for rate limiting and statistics
170 if isAggregator && s.aggregatorService != nil {
171 if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil {
172 // Log error but don't fail the request (post was already created on PDS)
173 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err)
174 }
175 }
176
177 // 11. Return response (AppView will index via Jetstream consumer)
178 log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s",
179 req.AuthorDID, isAggregator, communityDID, uri)
180
181 return &CreatePostResponse{
182 URI: uri,
183 CID: cid,
184 }, nil
185}
186
187// validateCreateRequest validates basic input requirements
188func (s *postService) validateCreateRequest(req CreatePostRequest) error {
189 // Global content limits (from lexicon)
190 const (
191 maxContentLength = 100000 // 100k characters - matches social.coves.community.post lexicon
192 maxTitleLength = 3000 // 3k bytes
193 maxTitleGraphemes = 300 // 300 graphemes (simplified check)
194 )
195
196 // Validate community required
197 if req.Community == "" {
198 return NewValidationError("community", "community is required")
199 }
200
201 // Validate author DID set by handler
202 if req.AuthorDID == "" {
203 return NewValidationError("authorDid", "authorDid must be set from authenticated user")
204 }
205
206 // Validate content length
207 if req.Content != nil && len(*req.Content) > maxContentLength {
208 return NewValidationError("content",
209 fmt.Sprintf("content too long (max %d characters)", maxContentLength))
210 }
211
212 // Validate title length
213 if req.Title != nil {
214 if len(*req.Title) > maxTitleLength {
215 return NewValidationError("title",
216 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength))
217 }
218 // Simplified grapheme check (actual implementation would need unicode library)
219 // For Alpha, byte length check is sufficient
220 }
221
222 // Validate content labels are from known values
223 if req.Labels != nil {
224 validLabels := map[string]bool{
225 "nsfw": true,
226 "spoiler": true,
227 "violence": true,
228 }
229 for _, label := range req.Labels.Values {
230 if !validLabels[label.Val] {
231 return NewValidationError("labels",
232 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label.Val))
233 }
234 }
235 }
236
237 return nil
238}
239
240// createPostOnPDS writes a post record to the community's PDS repository
241// Uses com.atproto.repo.createRecord endpoint
242func (s *postService) createPostOnPDS(
243 ctx context.Context,
244 community *communities.Community,
245 record PostRecord,
246) (uri, cid string, err error) {
247 // Use community's PDS URL (not service default) for federated communities
248 // Each community can be hosted on a different PDS instance
249 pdsURL := community.PDSURL
250 if pdsURL == "" {
251 // Fallback to service default if community doesn't have a PDS URL
252 // (shouldn't happen in practice, but safe default)
253 pdsURL = s.pdsURL
254 }
255
256 // Build PDS endpoint URL
257 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL)
258
259 // Build request payload
260 // IMPORTANT: repo is set to community DID, not author DID
261 // This writes the post to the community's repository
262 payload := map[string]interface{}{
263 "repo": community.DID, // Community's repository
264 "collection": "social.coves.community.post", // Collection type
265 "record": record, // The post record
266 // "rkey" omitted - PDS will auto-generate TID
267 }
268
269 // Marshal payload
270 jsonData, err := json.Marshal(payload)
271 if err != nil {
272 return "", "", fmt.Errorf("failed to marshal post payload: %w", err)
273 }
274
275 // Create HTTP request
276 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData))
277 if err != nil {
278 return "", "", fmt.Errorf("failed to create PDS request: %w", err)
279 }
280
281 // Set headers (auth + content type)
282 req.Header.Set("Content-Type", "application/json")
283 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken)
284
285 // Extended timeout for write operations (30 seconds)
286 client := &http.Client{
287 Timeout: 30 * time.Second,
288 }
289
290 // Execute request
291 resp, err := client.Do(req)
292 if err != nil {
293 return "", "", fmt.Errorf("PDS request failed: %w", err)
294 }
295 defer func() {
296 if closeErr := resp.Body.Close(); closeErr != nil {
297 log.Printf("Warning: failed to close response body: %v", closeErr)
298 }
299 }()
300
301 // Read response body
302 body, err := io.ReadAll(resp.Body)
303 if err != nil {
304 return "", "", fmt.Errorf("failed to read PDS response: %w", err)
305 }
306
307 // Check for errors
308 if resp.StatusCode != http.StatusOK {
309 // Sanitize error body for logging (prevent sensitive data leakage)
310 bodyPreview := string(body)
311 if len(bodyPreview) > 200 {
312 bodyPreview = bodyPreview[:200] + "... (truncated)"
313 }
314 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview)
315
316 // Return truncated error (defense in depth - handler will mask this further)
317 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview)
318 }
319
320 // Parse response
321 var result struct {
322 URI string `json:"uri"`
323 CID string `json:"cid"`
324 }
325 if err := json.Unmarshal(body, &result); err != nil {
326 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
327 }
328
329 return result.URI, result.CID, nil
330}