A community based topic aggregation platform built on atproto
1package posts
2
3import (
4 "Coves/internal/api/middleware"
5 "Coves/internal/core/aggregators"
6 "Coves/internal/core/communities"
7 "bytes"
8 "context"
9 "encoding/json"
10 "fmt"
11 "io"
12 "log"
13 "net/http"
14 "time"
15)
16
17type postService struct {
18 repo Repository
19 communityService communities.Service
20 aggregatorService aggregators.Service
21 pdsURL string
22}
23
24// NewPostService creates a new post service
25// aggregatorService can be nil if aggregator support is not needed (e.g., in tests or minimal setups)
26func NewPostService(
27 repo Repository,
28 communityService communities.Service,
29 aggregatorService aggregators.Service, // Optional: can be nil
30 pdsURL string,
31) Service {
32 return &postService{
33 repo: repo,
34 communityService: communityService,
35 aggregatorService: aggregatorService,
36 pdsURL: pdsURL,
37 }
38}
39
40// CreatePost creates a new post in a community
41// Flow:
42// 1. Validate input
43// 2. Check if author is an aggregator (server-side validation using DID from JWT)
44// 3. If aggregator: validate authorization and rate limits, skip membership checks
45// 4. If user: resolve community and perform membership/ban validation
46// 5. Build post record
47// 6. Write to community's PDS repository
48// 7. If aggregator: record post for rate limiting
49// 8. Return URI/CID (AppView indexes asynchronously via Jetstream)
50func (s *postService) CreatePost(ctx context.Context, req CreatePostRequest) (*CreatePostResponse, error) {
51 // 1. SECURITY: Extract authenticated DID from context (set by JWT middleware)
52 // Defense-in-depth: verify service layer receives correct DID even if handler is bypassed
53 authenticatedDID := middleware.GetAuthenticatedDID(ctx)
54 if authenticatedDID == "" {
55 return nil, fmt.Errorf("no authenticated DID in context - authentication required")
56 }
57
58 // SECURITY: Verify request DID matches authenticated DID from JWT
59 // This prevents DID spoofing where a malicious client or compromised handler
60 // could provide a different DID than what was authenticated
61 if authenticatedDID != req.AuthorDID {
62 log.Printf("[SECURITY] DID mismatch: authenticated=%s, request=%s", authenticatedDID, req.AuthorDID)
63 return nil, fmt.Errorf("authenticated DID does not match author DID")
64 }
65
66 // 2. Validate basic input
67 if err := s.validateCreateRequest(req); err != nil {
68 return nil, err
69 }
70
71 // 3. SECURITY: Check if the authenticated DID is a registered aggregator
72 // This is server-side verification - we query the database to confirm
73 // the DID from the JWT corresponds to a registered aggregator service
74 // If aggregatorService is nil (tests or environments without aggregators), treat all posts as user posts
75 isAggregator := false
76 if s.aggregatorService != nil {
77 var err error
78 isAggregator, err = s.aggregatorService.IsAggregator(ctx, req.AuthorDID)
79 if err != nil {
80 return nil, fmt.Errorf("failed to check if author is aggregator: %w", err)
81 }
82 }
83
84 // 4. Resolve community at-identifier (handle or DID) to DID
85 // This accepts both formats per atProto best practices:
86 // - Handles: !gardening.communities.coves.social
87 // - DIDs: did:plc:abc123 or did:web:coves.social
88 communityDID, err := s.communityService.ResolveCommunityIdentifier(ctx, req.Community)
89 if err != nil {
90 // Handle specific error types appropriately
91 if communities.IsNotFound(err) {
92 return nil, ErrCommunityNotFound
93 }
94 if communities.IsValidationError(err) {
95 // Pass through validation errors (invalid format, etc.)
96 return nil, NewValidationError("community", err.Error())
97 }
98 // Infrastructure failures (DB errors, network issues) should be internal errors
99 // Don't leak internal details to client (e.g., "pq: connection refused")
100 return nil, fmt.Errorf("failed to resolve community identifier: %w", err)
101 }
102
103 // 5. Fetch community from AppView (includes all metadata)
104 community, err := s.communityService.GetByDID(ctx, communityDID)
105 if err != nil {
106 if communities.IsNotFound(err) {
107 return nil, ErrCommunityNotFound
108 }
109 return nil, fmt.Errorf("failed to fetch community: %w", err)
110 }
111
112 // 6. Apply validation based on actor type (aggregator vs user)
113 if isAggregator {
114 // AGGREGATOR VALIDATION FLOW
115 // Following Bluesky's pattern: feed generators and labelers are authorized services
116 log.Printf("[POST-CREATE] Aggregator detected: %s posting to community: %s", req.AuthorDID, communityDID)
117
118 // Check authorization exists and is enabled, and verify rate limits
119 if err := s.aggregatorService.ValidateAggregatorPost(ctx, req.AuthorDID, communityDID); err != nil {
120 if aggregators.IsUnauthorized(err) {
121 return nil, ErrNotAuthorized
122 }
123 if aggregators.IsRateLimited(err) {
124 return nil, ErrRateLimitExceeded
125 }
126 return nil, fmt.Errorf("aggregator validation failed: %w", err)
127 }
128
129 // Aggregators skip membership checks and visibility restrictions
130 // They are authorized services, not community members
131 } else {
132 // USER VALIDATION FLOW
133 // Check community visibility (Alpha: public/unlisted only)
134 // Beta will add membership checks for private communities
135 if community.Visibility == "private" {
136 return nil, ErrNotAuthorized
137 }
138 }
139
140 // 7. Ensure community has fresh PDS credentials (token refresh if needed)
141 community, err = s.communityService.EnsureFreshToken(ctx, community)
142 if err != nil {
143 return nil, fmt.Errorf("failed to refresh community credentials: %w", err)
144 }
145
146 // 8. Build post record for PDS
147 postRecord := PostRecord{
148 Type: "social.coves.post.record",
149 Community: communityDID,
150 Author: req.AuthorDID,
151 Title: req.Title,
152 Content: req.Content,
153 Facets: req.Facets,
154 Embed: req.Embed,
155 ContentLabels: req.ContentLabels,
156 OriginalAuthor: req.OriginalAuthor,
157 FederatedFrom: req.FederatedFrom,
158 Location: req.Location,
159 CreatedAt: time.Now().UTC().Format(time.RFC3339),
160 }
161
162 // 9. Write to community's PDS repository
163 uri, cid, err := s.createPostOnPDS(ctx, community, postRecord)
164 if err != nil {
165 return nil, fmt.Errorf("failed to write post to PDS: %w", err)
166 }
167
168 // 10. If aggregator, record post for rate limiting and statistics
169 if isAggregator && s.aggregatorService != nil {
170 if err := s.aggregatorService.RecordAggregatorPost(ctx, req.AuthorDID, communityDID, uri, cid); err != nil {
171 // Log error but don't fail the request (post was already created on PDS)
172 log.Printf("[POST-CREATE] Warning: failed to record aggregator post for rate limiting: %v", err)
173 }
174 }
175
176 // 11. Return response (AppView will index via Jetstream consumer)
177 log.Printf("[POST-CREATE] Author: %s (aggregator=%v), Community: %s, URI: %s",
178 req.AuthorDID, isAggregator, communityDID, uri)
179
180 return &CreatePostResponse{
181 URI: uri,
182 CID: cid,
183 }, nil
184}
185
186// validateCreateRequest validates basic input requirements
187func (s *postService) validateCreateRequest(req CreatePostRequest) error {
188 // Global content limits (from lexicon)
189 const (
190 maxContentLength = 50000 // 50k characters
191 maxTitleLength = 3000 // 3k bytes
192 maxTitleGraphemes = 300 // 300 graphemes (simplified check)
193 )
194
195 // Validate community required
196 if req.Community == "" {
197 return NewValidationError("community", "community is required")
198 }
199
200 // Validate author DID set by handler
201 if req.AuthorDID == "" {
202 return NewValidationError("authorDid", "authorDid must be set from authenticated user")
203 }
204
205 // Validate content length
206 if req.Content != nil && len(*req.Content) > maxContentLength {
207 return NewValidationError("content",
208 fmt.Sprintf("content too long (max %d characters)", maxContentLength))
209 }
210
211 // Validate title length
212 if req.Title != nil {
213 if len(*req.Title) > maxTitleLength {
214 return NewValidationError("title",
215 fmt.Sprintf("title too long (max %d bytes)", maxTitleLength))
216 }
217 // Simplified grapheme check (actual implementation would need unicode library)
218 // For Alpha, byte length check is sufficient
219 }
220
221 // Validate content labels are from known values
222 validLabels := map[string]bool{
223 "nsfw": true,
224 "spoiler": true,
225 "violence": true,
226 }
227 for _, label := range req.ContentLabels {
228 if !validLabels[label] {
229 return NewValidationError("contentLabels",
230 fmt.Sprintf("unknown content label: %s (valid: nsfw, spoiler, violence)", label))
231 }
232 }
233
234 return nil
235}
236
237// createPostOnPDS writes a post record to the community's PDS repository
238// Uses com.atproto.repo.createRecord endpoint
239func (s *postService) createPostOnPDS(
240 ctx context.Context,
241 community *communities.Community,
242 record PostRecord,
243) (uri, cid string, err error) {
244 // Use community's PDS URL (not service default) for federated communities
245 // Each community can be hosted on a different PDS instance
246 pdsURL := community.PDSURL
247 if pdsURL == "" {
248 // Fallback to service default if community doesn't have a PDS URL
249 // (shouldn't happen in practice, but safe default)
250 pdsURL = s.pdsURL
251 }
252
253 // Build PDS endpoint URL
254 endpoint := fmt.Sprintf("%s/xrpc/com.atproto.repo.createRecord", pdsURL)
255
256 // Build request payload
257 // IMPORTANT: repo is set to community DID, not author DID
258 // This writes the post to the community's repository
259 payload := map[string]interface{}{
260 "repo": community.DID, // Community's repository
261 "collection": "social.coves.post.record", // Collection type
262 "record": record, // The post record
263 // "rkey" omitted - PDS will auto-generate TID
264 }
265
266 // Marshal payload
267 jsonData, err := json.Marshal(payload)
268 if err != nil {
269 return "", "", fmt.Errorf("failed to marshal post payload: %w", err)
270 }
271
272 // Create HTTP request
273 req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewBuffer(jsonData))
274 if err != nil {
275 return "", "", fmt.Errorf("failed to create PDS request: %w", err)
276 }
277
278 // Set headers (auth + content type)
279 req.Header.Set("Content-Type", "application/json")
280 req.Header.Set("Authorization", "Bearer "+community.PDSAccessToken)
281
282 // Extended timeout for write operations (30 seconds)
283 client := &http.Client{
284 Timeout: 30 * time.Second,
285 }
286
287 // Execute request
288 resp, err := client.Do(req)
289 if err != nil {
290 return "", "", fmt.Errorf("PDS request failed: %w", err)
291 }
292 defer func() {
293 if closeErr := resp.Body.Close(); closeErr != nil {
294 log.Printf("Warning: failed to close response body: %v", closeErr)
295 }
296 }()
297
298 // Read response body
299 body, err := io.ReadAll(resp.Body)
300 if err != nil {
301 return "", "", fmt.Errorf("failed to read PDS response: %w", err)
302 }
303
304 // Check for errors
305 if resp.StatusCode != http.StatusOK {
306 // Sanitize error body for logging (prevent sensitive data leakage)
307 bodyPreview := string(body)
308 if len(bodyPreview) > 200 {
309 bodyPreview = bodyPreview[:200] + "... (truncated)"
310 }
311 log.Printf("[POST-CREATE-ERROR] PDS Status: %d, Body: %s", resp.StatusCode, bodyPreview)
312
313 // Return truncated error (defense in depth - handler will mask this further)
314 return "", "", fmt.Errorf("PDS returned error %d: %s", resp.StatusCode, bodyPreview)
315 }
316
317 // Parse response
318 var result struct {
319 URI string `json:"uri"`
320 CID string `json:"cid"`
321 }
322 if err := json.Unmarshal(body, &result); err != nil {
323 return "", "", fmt.Errorf("failed to parse PDS response: %w", err)
324 }
325
326 return result.URI, result.CID, nil
327}