A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log"
9 "strings"
10 "time"
11
12 "Coves/internal/core/communities"
13 "Coves/internal/core/posts"
14 "Coves/internal/core/users"
15)
16
17// PostEventConsumer consumes post-related events from Jetstream
18// Currently handles only CREATE operations for social.coves.community.post
19// UPDATE and DELETE handlers will be added when those features are implemented
20type PostEventConsumer struct {
21 postRepo posts.Repository
22 communityRepo communities.Repository
23 userService users.UserService
24 db *sql.DB // Direct DB access for atomic count reconciliation
25}
26
27// NewPostEventConsumer creates a new Jetstream consumer for post events
28func NewPostEventConsumer(
29 postRepo posts.Repository,
30 communityRepo communities.Repository,
31 userService users.UserService,
32 db *sql.DB,
33) *PostEventConsumer {
34 return &PostEventConsumer{
35 postRepo: postRepo,
36 communityRepo: communityRepo,
37 userService: userService,
38 db: db,
39 }
40}
41
42// HandleEvent processes a Jetstream event for post records
43// Currently only handles CREATE operations - UPDATE/DELETE deferred until those features exist
44func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
45 // We only care about commit events for post records
46 if event.Kind != "commit" || event.Commit == nil {
47 return nil
48 }
49
50 commit := event.Commit
51
52 // Only handle post record creation for now
53 // UPDATE and DELETE will be added when we implement those features
54 if commit.Collection == "social.coves.community.post" && commit.Operation == "create" {
55 return c.createPost(ctx, event.Did, commit)
56 }
57
58 // Silently ignore other operations (update, delete) and other collections
59 return nil
60}
61
62// createPost indexes a new post from the firehose
63func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error {
64 if commit.Record == nil {
65 return fmt.Errorf("post create event missing record data")
66 }
67
68 // Parse the post record
69 postRecord, err := parsePostRecord(commit.Record)
70 if err != nil {
71 return fmt.Errorf("failed to parse post record: %w", err)
72 }
73
74 // SECURITY: Validate this is a legitimate post event
75 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil {
76 log.Printf("🚨 SECURITY: Rejecting post event: %v", err)
77 return err
78 }
79
80 // Build AT-URI for this post
81 // Format: at://community_did/social.coves.community.post/rkey
82 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey)
83
84 // Parse timestamp from record
85 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt)
86 if err != nil {
87 // Fallback to current time if parsing fails
88 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
89 createdAt = time.Now()
90 }
91
92 // Build post entity
93 post := &posts.Post{
94 URI: uri,
95 CID: commit.CID,
96 RKey: commit.RKey,
97 AuthorDID: postRecord.Author,
98 CommunityDID: postRecord.Community,
99 Title: postRecord.Title,
100 Content: postRecord.Content,
101 CreatedAt: createdAt,
102 IndexedAt: time.Now(),
103 // Stats remain at 0 (no votes yet)
104 UpvoteCount: 0,
105 DownvoteCount: 0,
106 Score: 0,
107 CommentCount: 0,
108 }
109
110 // Serialize JSON fields (facets, embed, labels)
111 if postRecord.Facets != nil {
112 facetsJSON, marshalErr := json.Marshal(postRecord.Facets)
113 if marshalErr == nil {
114 facetsStr := string(facetsJSON)
115 post.ContentFacets = &facetsStr
116 }
117 }
118
119 if postRecord.Embed != nil {
120 embedJSON, marshalErr := json.Marshal(postRecord.Embed)
121 if marshalErr == nil {
122 embedStr := string(embedJSON)
123 post.Embed = &embedStr
124 }
125 }
126
127 if postRecord.Labels != nil {
128 labelsJSON, marshalErr := json.Marshal(postRecord.Labels)
129 if marshalErr == nil {
130 labelsStr := string(labelsJSON)
131 post.ContentLabels = &labelsStr
132 }
133 }
134
135 // Atomically: Index post + Reconcile comment count for out-of-order arrivals
136 if err := c.indexPostAndReconcileCounts(ctx, post); err != nil {
137 return fmt.Errorf("failed to index post and reconcile counts: %w", err)
138 }
139
140 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)",
141 uri, post.AuthorDID, post.CommunityDID, commit.RKey)
142 return nil
143}
144
145// indexPostAndReconcileCounts atomically indexes a post and reconciles comment counts
146// This fixes the race condition where comments arrive before their parent post
147func (c *PostEventConsumer) indexPostAndReconcileCounts(ctx context.Context, post *posts.Post) error {
148 tx, err := c.db.BeginTx(ctx, nil)
149 if err != nil {
150 return fmt.Errorf("failed to begin transaction: %w", err)
151 }
152 defer func() {
153 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone {
154 log.Printf("Failed to rollback transaction: %v", rollbackErr)
155 }
156 }()
157
158 // 1. Insert the post (idempotent with RETURNING clause)
159 var facetsJSON, embedJSON, labelsJSON sql.NullString
160
161 if post.ContentFacets != nil {
162 facetsJSON.String = *post.ContentFacets
163 facetsJSON.Valid = true
164 }
165
166 if post.Embed != nil {
167 embedJSON.String = *post.Embed
168 embedJSON.Valid = true
169 }
170
171 if post.ContentLabels != nil {
172 labelsJSON.String = *post.ContentLabels
173 labelsJSON.Valid = true
174 }
175
176 insertQuery := `
177 INSERT INTO posts (
178 uri, cid, rkey, author_did, community_did,
179 title, content, content_facets, embed, content_labels,
180 created_at, indexed_at
181 ) VALUES (
182 $1, $2, $3, $4, $5,
183 $6, $7, $8, $9, $10,
184 $11, NOW()
185 )
186 ON CONFLICT (uri) DO NOTHING
187 RETURNING id
188 `
189
190 var postID int64
191 insertErr := tx.QueryRowContext(
192 ctx, insertQuery,
193 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID,
194 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON,
195 post.CreatedAt,
196 ).Scan(&postID)
197
198 // If no rows returned, post already exists (idempotent - OK for Jetstream replays)
199 if insertErr == sql.ErrNoRows {
200 log.Printf("Post already indexed: %s (idempotent)", post.URI)
201 if commitErr := tx.Commit(); commitErr != nil {
202 return fmt.Errorf("failed to commit transaction: %w", commitErr)
203 }
204 return nil
205 }
206
207 if insertErr != nil {
208 return fmt.Errorf("failed to insert post: %w", insertErr)
209 }
210
211 // 2. Reconcile comment_count for this newly inserted post
212 // In case any comments arrived out-of-order before this post was indexed
213 // This is the CRITICAL FIX for the race condition identified in the PR review
214 reconcileQuery := `
215 UPDATE posts
216 SET comment_count = (
217 SELECT COUNT(*)
218 FROM comments c
219 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL
220 )
221 WHERE id = $2
222 `
223 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, post.URI, postID)
224 if reconcileErr != nil {
225 log.Printf("Warning: Failed to reconcile comment_count for %s: %v", post.URI, reconcileErr)
226 // Continue anyway - this is a best-effort reconciliation
227 }
228
229 // Commit transaction
230 if err := tx.Commit(); err != nil {
231 return fmt.Errorf("failed to commit transaction: %w", err)
232 }
233
234 return nil
235}
236
237// validatePostEvent performs security validation on post events
238// This prevents malicious actors from indexing fake posts
239func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error {
240 // CRITICAL SECURITY CHECK:
241 // Posts MUST come from community repositories, not user repositories
242 // This prevents users from creating posts that appear to be from communities they don't control
243 //
244 // Example attack prevented:
245 // - User creates post in their own repo (at://user_did/social.coves.community.post/xyz)
246 // - Claims it's for community X (community field = community_did)
247 // - Without this check, fake post would be indexed
248 //
249 // With this check:
250 // - We verify event.Did (repo owner) == post.community (claimed community)
251 // - Reject if mismatch
252 if repoDID != post.Community {
253 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos",
254 repoDID, post.Community)
255 }
256
257 // CRITICAL: Verify community exists in AppView
258 // Posts MUST reference valid communities (enforced by FK constraint)
259 // If community isn't indexed yet, we must reject the post
260 // Jetstream will replay events, so the post will be indexed once community is ready
261 _, err := c.communityRepo.GetByDID(ctx, post.Community)
262 if err != nil {
263 if communities.IsNotFound(err) {
264 // Reject - community must be indexed before posts
265 // This maintains referential integrity and prevents orphaned posts
266 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community)
267 }
268 // Database error or other issue
269 return fmt.Errorf("failed to verify community exists: %w", err)
270 }
271
272 // CRITICAL: Verify author exists in AppView
273 // Every post MUST have a valid author (enforced by FK constraint)
274 // Even though posts live in community repos, they belong to specific authors
275 // If author isn't indexed yet, we must reject the post
276 _, err = c.userService.GetUserByDID(ctx, post.Author)
277 if err != nil {
278 // Check if it's a "not found" error using string matching
279 // (users package doesn't export IsNotFound)
280 if err.Error() == "user not found" || strings.Contains(err.Error(), "not found") {
281 // Reject - author must be indexed before posts
282 // This maintains referential integrity and prevents orphaned posts
283 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author)
284 }
285 // Database error or other issue
286 return fmt.Errorf("failed to verify author exists: %w", err)
287 }
288
289 return nil
290}
291
292// PostRecordFromJetstream represents a post record as received from Jetstream
293// Matches the structure written to PDS via social.coves.community.post
294type PostRecordFromJetstream struct {
295 OriginalAuthor interface{} `json:"originalAuthor,omitempty"`
296 FederatedFrom interface{} `json:"federatedFrom,omitempty"`
297 Location interface{} `json:"location,omitempty"`
298 Title *string `json:"title,omitempty"`
299 Content *string `json:"content,omitempty"`
300 Embed map[string]interface{} `json:"embed,omitempty"`
301 Labels *posts.SelfLabels `json:"labels,omitempty"`
302 Type string `json:"$type"`
303 Community string `json:"community"`
304 Author string `json:"author"`
305 CreatedAt string `json:"createdAt"`
306 Facets []interface{} `json:"facets,omitempty"`
307}
308
309// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream
310func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) {
311 // Marshal to JSON and back to ensure proper type conversion
312 recordJSON, err := json.Marshal(record)
313 if err != nil {
314 return nil, fmt.Errorf("failed to marshal record: %w", err)
315 }
316
317 var post PostRecordFromJetstream
318 if err := json.Unmarshal(recordJSON, &post); err != nil {
319 return nil, fmt.Errorf("failed to unmarshal post record: %w", err)
320 }
321
322 // Validate required fields
323 if post.Community == "" {
324 return nil, fmt.Errorf("post record missing community field")
325 }
326 if post.Author == "" {
327 return nil, fmt.Errorf("post record missing author field")
328 }
329 if post.CreatedAt == "" {
330 return nil, fmt.Errorf("post record missing createdAt field")
331 }
332
333 return &post, nil
334}