A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/communities" 5 "Coves/internal/core/posts" 6 "Coves/internal/core/users" 7 "context" 8 "database/sql" 9 "encoding/json" 10 "fmt" 11 "log" 12 "strings" 13 "time" 14) 15 16// PostEventConsumer consumes post-related events from Jetstream 17// Currently handles only CREATE operations for social.coves.community.post 18// UPDATE and DELETE handlers will be added when those features are implemented 19type PostEventConsumer struct { 20 postRepo posts.Repository 21 communityRepo communities.Repository 22 userService users.UserService 23 db *sql.DB // Direct DB access for atomic count reconciliation 24} 25 26// NewPostEventConsumer creates a new Jetstream consumer for post events 27func NewPostEventConsumer( 28 postRepo posts.Repository, 29 communityRepo communities.Repository, 30 userService users.UserService, 31 db *sql.DB, 32) *PostEventConsumer { 33 return &PostEventConsumer{ 34 postRepo: postRepo, 35 communityRepo: communityRepo, 36 userService: userService, 37 db: db, 38 } 39} 40 41// HandleEvent processes a Jetstream event for post records 42// Currently only handles CREATE operations - UPDATE/DELETE deferred until those features exist 43func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 44 // We only care about commit events for post records 45 if event.Kind != "commit" || event.Commit == nil { 46 return nil 47 } 48 49 commit := event.Commit 50 51 // Only handle post record creation for now 52 // UPDATE and DELETE will be added when we implement those features 53 if commit.Collection == "social.coves.community.post" && commit.Operation == "create" { 54 return c.createPost(ctx, event.Did, commit) 55 } 56 57 // Silently ignore other operations (update, delete) and other collections 58 return nil 59} 60 61// createPost indexes a new post from the firehose 62func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error { 63 if commit.Record == nil { 64 return fmt.Errorf("post create event missing record data") 65 } 66 67 // Parse the post record 68 postRecord, err := parsePostRecord(commit.Record) 69 if err != nil { 70 return fmt.Errorf("failed to parse post record: %w", err) 71 } 72 73 // SECURITY: Validate this is a legitimate post event 74 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil { 75 log.Printf("🚨 SECURITY: Rejecting post event: %v", err) 76 return err 77 } 78 79 // Build AT-URI for this post 80 // Format: at://community_did/social.coves.community.post/rkey 81 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey) 82 83 // Parse timestamp from record 84 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt) 85 if err != nil { 86 // Fallback to current time if parsing fails 87 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 88 createdAt = time.Now() 89 } 90 91 // Build post entity 92 post := &posts.Post{ 93 URI: uri, 94 CID: commit.CID, 95 RKey: commit.RKey, 96 AuthorDID: postRecord.Author, 97 CommunityDID: postRecord.Community, 98 Title: postRecord.Title, 99 Content: postRecord.Content, 100 CreatedAt: createdAt, 101 IndexedAt: time.Now(), 102 // Stats remain at 0 (no votes yet) 103 UpvoteCount: 0, 104 DownvoteCount: 0, 105 Score: 0, 106 CommentCount: 0, 107 } 108 109 // Serialize JSON fields (facets, embed, labels) 110 if postRecord.Facets != nil { 111 facetsJSON, marshalErr := json.Marshal(postRecord.Facets) 112 if marshalErr == nil { 113 facetsStr := string(facetsJSON) 114 post.ContentFacets = &facetsStr 115 } 116 } 117 118 if postRecord.Embed != nil { 119 embedJSON, marshalErr := json.Marshal(postRecord.Embed) 120 if marshalErr == nil { 121 embedStr := string(embedJSON) 122 post.Embed = &embedStr 123 } 124 } 125 126 if postRecord.Labels != nil { 127 labelsJSON, marshalErr := json.Marshal(postRecord.Labels) 128 if marshalErr == nil { 129 labelsStr := string(labelsJSON) 130 post.ContentLabels = &labelsStr 131 } 132 } 133 134 // Atomically: Index post + Reconcile comment count for out-of-order arrivals 135 if err := c.indexPostAndReconcileCounts(ctx, post); err != nil { 136 return fmt.Errorf("failed to index post and reconcile counts: %w", err) 137 } 138 139 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)", 140 uri, post.AuthorDID, post.CommunityDID, commit.RKey) 141 return nil 142} 143 144// indexPostAndReconcileCounts atomically indexes a post and reconciles comment counts 145// This fixes the race condition where comments arrive before their parent post 146func (c *PostEventConsumer) indexPostAndReconcileCounts(ctx context.Context, post *posts.Post) error { 147 tx, err := c.db.BeginTx(ctx, nil) 148 if err != nil { 149 return fmt.Errorf("failed to begin transaction: %w", err) 150 } 151 defer func() { 152 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 153 log.Printf("Failed to rollback transaction: %v", rollbackErr) 154 } 155 }() 156 157 // 1. Insert the post (idempotent with RETURNING clause) 158 var facetsJSON, embedJSON, labelsJSON sql.NullString 159 160 if post.ContentFacets != nil { 161 facetsJSON.String = *post.ContentFacets 162 facetsJSON.Valid = true 163 } 164 165 if post.Embed != nil { 166 embedJSON.String = *post.Embed 167 embedJSON.Valid = true 168 } 169 170 if post.ContentLabels != nil { 171 labelsJSON.String = *post.ContentLabels 172 labelsJSON.Valid = true 173 } 174 175 insertQuery := ` 176 INSERT INTO posts ( 177 uri, cid, rkey, author_did, community_did, 178 title, content, content_facets, embed, content_labels, 179 created_at, indexed_at 180 ) VALUES ( 181 $1, $2, $3, $4, $5, 182 $6, $7, $8, $9, $10, 183 $11, NOW() 184 ) 185 ON CONFLICT (uri) DO NOTHING 186 RETURNING id 187 ` 188 189 var postID int64 190 insertErr := tx.QueryRowContext( 191 ctx, insertQuery, 192 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID, 193 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON, 194 post.CreatedAt, 195 ).Scan(&postID) 196 197 // If no rows returned, post already exists (idempotent - OK for Jetstream replays) 198 if insertErr == sql.ErrNoRows { 199 log.Printf("Post already indexed: %s (idempotent)", post.URI) 200 if commitErr := tx.Commit(); commitErr != nil { 201 return fmt.Errorf("failed to commit transaction: %w", commitErr) 202 } 203 return nil 204 } 205 206 if insertErr != nil { 207 return fmt.Errorf("failed to insert post: %w", insertErr) 208 } 209 210 // 2. Reconcile comment_count for this newly inserted post 211 // In case any comments arrived out-of-order before this post was indexed 212 // This is the CRITICAL FIX for the race condition identified in the PR review 213 reconcileQuery := ` 214 UPDATE posts 215 SET comment_count = ( 216 SELECT COUNT(*) 217 FROM comments c 218 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 219 ) 220 WHERE id = $2 221 ` 222 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, post.URI, postID) 223 if reconcileErr != nil { 224 log.Printf("Warning: Failed to reconcile comment_count for %s: %v", post.URI, reconcileErr) 225 // Continue anyway - this is a best-effort reconciliation 226 } 227 228 // Commit transaction 229 if err := tx.Commit(); err != nil { 230 return fmt.Errorf("failed to commit transaction: %w", err) 231 } 232 233 return nil 234} 235 236// validatePostEvent performs security validation on post events 237// This prevents malicious actors from indexing fake posts 238func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error { 239 // CRITICAL SECURITY CHECK: 240 // Posts MUST come from community repositories, not user repositories 241 // This prevents users from creating posts that appear to be from communities they don't control 242 // 243 // Example attack prevented: 244 // - User creates post in their own repo (at://user_did/social.coves.community.post/xyz) 245 // - Claims it's for community X (community field = community_did) 246 // - Without this check, fake post would be indexed 247 // 248 // With this check: 249 // - We verify event.Did (repo owner) == post.community (claimed community) 250 // - Reject if mismatch 251 if repoDID != post.Community { 252 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos", 253 repoDID, post.Community) 254 } 255 256 // CRITICAL: Verify community exists in AppView 257 // Posts MUST reference valid communities (enforced by FK constraint) 258 // If community isn't indexed yet, we must reject the post 259 // Jetstream will replay events, so the post will be indexed once community is ready 260 _, err := c.communityRepo.GetByDID(ctx, post.Community) 261 if err != nil { 262 if communities.IsNotFound(err) { 263 // Reject - community must be indexed before posts 264 // This maintains referential integrity and prevents orphaned posts 265 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community) 266 } 267 // Database error or other issue 268 return fmt.Errorf("failed to verify community exists: %w", err) 269 } 270 271 // CRITICAL: Verify author exists in AppView 272 // Every post MUST have a valid author (enforced by FK constraint) 273 // Even though posts live in community repos, they belong to specific authors 274 // If author isn't indexed yet, we must reject the post 275 _, err = c.userService.GetUserByDID(ctx, post.Author) 276 if err != nil { 277 // Check if it's a "not found" error using string matching 278 // (users package doesn't export IsNotFound) 279 if err.Error() == "user not found" || strings.Contains(err.Error(), "not found") { 280 // Reject - author must be indexed before posts 281 // This maintains referential integrity and prevents orphaned posts 282 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author) 283 } 284 // Database error or other issue 285 return fmt.Errorf("failed to verify author exists: %w", err) 286 } 287 288 return nil 289} 290 291// PostRecordFromJetstream represents a post record as received from Jetstream 292// Matches the structure written to PDS via social.coves.community.post 293type PostRecordFromJetstream struct { 294 OriginalAuthor interface{} `json:"originalAuthor,omitempty"` 295 FederatedFrom interface{} `json:"federatedFrom,omitempty"` 296 Location interface{} `json:"location,omitempty"` 297 Title *string `json:"title,omitempty"` 298 Content *string `json:"content,omitempty"` 299 Embed map[string]interface{} `json:"embed,omitempty"` 300 Labels *posts.SelfLabels `json:"labels,omitempty"` 301 Type string `json:"$type"` 302 Community string `json:"community"` 303 Author string `json:"author"` 304 CreatedAt string `json:"createdAt"` 305 Facets []interface{} `json:"facets,omitempty"` 306} 307 308// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream 309func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) { 310 // Marshal to JSON and back to ensure proper type conversion 311 recordJSON, err := json.Marshal(record) 312 if err != nil { 313 return nil, fmt.Errorf("failed to marshal record: %w", err) 314 } 315 316 var post PostRecordFromJetstream 317 if err := json.Unmarshal(recordJSON, &post); err != nil { 318 return nil, fmt.Errorf("failed to unmarshal post record: %w", err) 319 } 320 321 // Validate required fields 322 if post.Community == "" { 323 return nil, fmt.Errorf("post record missing community field") 324 } 325 if post.Author == "" { 326 return nil, fmt.Errorf("post record missing author field") 327 } 328 if post.CreatedAt == "" { 329 return nil, fmt.Errorf("post record missing createdAt field") 330 } 331 332 return &post, nil 333}