A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/communities"
5 "Coves/internal/core/posts"
6 "Coves/internal/core/users"
7 "context"
8 "database/sql"
9 "encoding/json"
10 "fmt"
11 "log"
12 "strings"
13 "time"
14)
15
16// PostEventConsumer consumes post-related events from Jetstream
17// Currently handles only CREATE operations for social.coves.community.post
18// UPDATE and DELETE handlers will be added when those features are implemented
19type PostEventConsumer struct {
20 postRepo posts.Repository
21 communityRepo communities.Repository
22 userService users.UserService
23 db *sql.DB // Direct DB access for atomic count reconciliation
24}
25
26// NewPostEventConsumer creates a new Jetstream consumer for post events
27func NewPostEventConsumer(
28 postRepo posts.Repository,
29 communityRepo communities.Repository,
30 userService users.UserService,
31 db *sql.DB,
32) *PostEventConsumer {
33 return &PostEventConsumer{
34 postRepo: postRepo,
35 communityRepo: communityRepo,
36 userService: userService,
37 db: db,
38 }
39}
40
41// HandleEvent processes a Jetstream event for post records
42// Currently only handles CREATE operations - UPDATE/DELETE deferred until those features exist
43func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
44 // We only care about commit events for post records
45 if event.Kind != "commit" || event.Commit == nil {
46 return nil
47 }
48
49 commit := event.Commit
50
51 // Only handle post record creation for now
52 // UPDATE and DELETE will be added when we implement those features
53 if commit.Collection == "social.coves.community.post" && commit.Operation == "create" {
54 return c.createPost(ctx, event.Did, commit)
55 }
56
57 // Silently ignore other operations (update, delete) and other collections
58 return nil
59}
60
61// createPost indexes a new post from the firehose
62func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error {
63 if commit.Record == nil {
64 return fmt.Errorf("post create event missing record data")
65 }
66
67 // Parse the post record
68 postRecord, err := parsePostRecord(commit.Record)
69 if err != nil {
70 return fmt.Errorf("failed to parse post record: %w", err)
71 }
72
73 // SECURITY: Validate this is a legitimate post event
74 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil {
75 log.Printf("🚨 SECURITY: Rejecting post event: %v", err)
76 return err
77 }
78
79 // Build AT-URI for this post
80 // Format: at://community_did/social.coves.community.post/rkey
81 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey)
82
83 // Parse timestamp from record
84 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt)
85 if err != nil {
86 // Fallback to current time if parsing fails
87 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
88 createdAt = time.Now()
89 }
90
91 // Build post entity
92 post := &posts.Post{
93 URI: uri,
94 CID: commit.CID,
95 RKey: commit.RKey,
96 AuthorDID: postRecord.Author,
97 CommunityDID: postRecord.Community,
98 Title: postRecord.Title,
99 Content: postRecord.Content,
100 CreatedAt: createdAt,
101 IndexedAt: time.Now(),
102 // Stats remain at 0 (no votes yet)
103 UpvoteCount: 0,
104 DownvoteCount: 0,
105 Score: 0,
106 CommentCount: 0,
107 }
108
109 // Serialize JSON fields (facets, embed, labels)
110 if postRecord.Facets != nil {
111 facetsJSON, marshalErr := json.Marshal(postRecord.Facets)
112 if marshalErr == nil {
113 facetsStr := string(facetsJSON)
114 post.ContentFacets = &facetsStr
115 }
116 }
117
118 if postRecord.Embed != nil {
119 embedJSON, marshalErr := json.Marshal(postRecord.Embed)
120 if marshalErr == nil {
121 embedStr := string(embedJSON)
122 post.Embed = &embedStr
123 }
124 }
125
126 if postRecord.Labels != nil {
127 labelsJSON, marshalErr := json.Marshal(postRecord.Labels)
128 if marshalErr == nil {
129 labelsStr := string(labelsJSON)
130 post.ContentLabels = &labelsStr
131 }
132 }
133
134 // Atomically: Index post + Reconcile comment count for out-of-order arrivals
135 if err := c.indexPostAndReconcileCounts(ctx, post); err != nil {
136 return fmt.Errorf("failed to index post and reconcile counts: %w", err)
137 }
138
139 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)",
140 uri, post.AuthorDID, post.CommunityDID, commit.RKey)
141 return nil
142}
143
144// indexPostAndReconcileCounts atomically indexes a post and reconciles comment counts
145// This fixes the race condition where comments arrive before their parent post
146func (c *PostEventConsumer) indexPostAndReconcileCounts(ctx context.Context, post *posts.Post) error {
147 tx, err := c.db.BeginTx(ctx, nil)
148 if err != nil {
149 return fmt.Errorf("failed to begin transaction: %w", err)
150 }
151 defer func() {
152 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
153 log.Printf("Failed to rollback transaction: %v", rollbackErr)
154 }
155 }()
156
157 // 1. Insert the post (idempotent with RETURNING clause)
158 var facetsJSON, embedJSON, labelsJSON sql.NullString
159
160 if post.ContentFacets != nil {
161 facetsJSON.String = *post.ContentFacets
162 facetsJSON.Valid = true
163 }
164
165 if post.Embed != nil {
166 embedJSON.String = *post.Embed
167 embedJSON.Valid = true
168 }
169
170 if post.ContentLabels != nil {
171 labelsJSON.String = *post.ContentLabels
172 labelsJSON.Valid = true
173 }
174
175 insertQuery := `
176 INSERT INTO posts (
177 uri, cid, rkey, author_did, community_did,
178 title, content, content_facets, embed, content_labels,
179 created_at, indexed_at
180 ) VALUES (
181 $1, $2, $3, $4, $5,
182 $6, $7, $8, $9, $10,
183 $11, NOW()
184 )
185 ON CONFLICT (uri) DO NOTHING
186 RETURNING id
187 `
188
189 var postID int64
190 insertErr := tx.QueryRowContext(
191 ctx, insertQuery,
192 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
193 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON,
194 post.CreatedAt,
195 ).Scan(&postID)
196
197 // If no rows returned, post already exists (idempotent - OK for Jetstream replays)
198 if insertErr == sql.ErrNoRows {
199 log.Printf("Post already indexed: %s (idempotent)", post.URI)
200 if commitErr := tx.Commit(); commitErr != nil {
201 return fmt.Errorf("failed to commit transaction: %w", commitErr)
202 }
203 return nil
204 }
205
206 if insertErr != nil {
207 return fmt.Errorf("failed to insert post: %w", insertErr)
208 }
209
210 // 2. Reconcile comment_count for this newly inserted post
211 // In case any comments arrived out-of-order before this post was indexed
212 // This is the CRITICAL FIX for the race condition identified in the PR review
213 reconcileQuery := `
214 UPDATE posts
215 SET comment_count = (
216 SELECT COUNT(*)
217 FROM comments c
218 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
219 )
220 WHERE id = $2
221 `
222 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, post.URI, postID)
223 if reconcileErr != nil {
224 log.Printf("Warning: Failed to reconcile comment_count for %s: %v", post.URI, reconcileErr)
225 // Continue anyway - this is a best-effort reconciliation
226 }
227
228 // Commit transaction
229 if err := tx.Commit(); err != nil {
230 return fmt.Errorf("failed to commit transaction: %w", err)
231 }
232
233 return nil
234}
235
236// validatePostEvent performs security validation on post events
237// This prevents malicious actors from indexing fake posts
238func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error {
239 // CRITICAL SECURITY CHECK:
240 // Posts MUST come from community repositories, not user repositories
241 // This prevents users from creating posts that appear to be from communities they don't control
242 //
243 // Example attack prevented:
244 // - User creates post in their own repo (at://user_did/social.coves.community.post/xyz)
245 // - Claims it's for community X (community field = community_did)
246 // - Without this check, fake post would be indexed
247 //
248 // With this check:
249 // - We verify event.Did (repo owner) == post.community (claimed community)
250 // - Reject if mismatch
251 if repoDID != post.Community {
252 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos",
253 repoDID, post.Community)
254 }
255
256 // CRITICAL: Verify community exists in AppView
257 // Posts MUST reference valid communities (enforced by FK constraint)
258 // If community isn't indexed yet, we must reject the post
259 // Jetstream will replay events, so the post will be indexed once community is ready
260 _, err := c.communityRepo.GetByDID(ctx, post.Community)
261 if err != nil {
262 if communities.IsNotFound(err) {
263 // Reject - community must be indexed before posts
264 // This maintains referential integrity and prevents orphaned posts
265 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community)
266 }
267 // Database error or other issue
268 return fmt.Errorf("failed to verify community exists: %w", err)
269 }
270
271 // CRITICAL: Verify author exists in AppView
272 // Every post MUST have a valid author (enforced by FK constraint)
273 // Even though posts live in community repos, they belong to specific authors
274 // If author isn't indexed yet, we must reject the post
275 _, err = c.userService.GetUserByDID(ctx, post.Author)
276 if err != nil {
277 // Check if it's a "not found" error using string matching
278 // (users package doesn't export IsNotFound)
279 if err.Error() == "user not found" || strings.Contains(err.Error(), "not found") {
280 // Reject - author must be indexed before posts
281 // This maintains referential integrity and prevents orphaned posts
282 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author)
283 }
284 // Database error or other issue
285 return fmt.Errorf("failed to verify author exists: %w", err)
286 }
287
288 return nil
289}
290
291// PostRecordFromJetstream represents a post record as received from Jetstream
292// Matches the structure written to PDS via social.coves.community.post
293type PostRecordFromJetstream struct {
294 OriginalAuthor interface{} `json:"originalAuthor,omitempty"`
295 FederatedFrom interface{} `json:"federatedFrom,omitempty"`
296 Location interface{} `json:"location,omitempty"`
297 Title *string `json:"title,omitempty"`
298 Content *string `json:"content,omitempty"`
299 Embed map[string]interface{} `json:"embed,omitempty"`
300 Labels *posts.SelfLabels `json:"labels,omitempty"`
301 Type string `json:"$type"`
302 Community string `json:"community"`
303 Author string `json:"author"`
304 CreatedAt string `json:"createdAt"`
305 Facets []interface{} `json:"facets,omitempty"`
306}
307
308// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream
309func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) {
310 // Marshal to JSON and back to ensure proper type conversion
311 recordJSON, err := json.Marshal(record)
312 if err != nil {
313 return nil, fmt.Errorf("failed to marshal record: %w", err)
314 }
315
316 var post PostRecordFromJetstream
317 if err := json.Unmarshal(recordJSON, &post); err != nil {
318 return nil, fmt.Errorf("failed to unmarshal post record: %w", err)
319 }
320
321 // Validate required fields
322 if post.Community == "" {
323 return nil, fmt.Errorf("post record missing community field")
324 }
325 if post.Author == "" {
326 return nil, fmt.Errorf("post record missing author field")
327 }
328 if post.CreatedAt == "" {
329 return nil, fmt.Errorf("post record missing createdAt field")
330 }
331
332 return &post, nil
333}