A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log" 9 "strings" 10 "time" 11 12 "Coves/internal/core/communities" 13 "Coves/internal/core/posts" 14 "Coves/internal/core/users" 15) 16 17// PostEventConsumer consumes post-related events from Jetstream 18// Currently handles only CREATE operations for social.coves.community.post 19// UPDATE and DELETE handlers will be added when those features are implemented 20type PostEventConsumer struct { 21 postRepo posts.Repository 22 communityRepo communities.Repository 23 userService users.UserService 24 db *sql.DB // Direct DB access for atomic count reconciliation 25} 26 27// NewPostEventConsumer creates a new Jetstream consumer for post events 28func NewPostEventConsumer( 29 postRepo posts.Repository, 30 communityRepo communities.Repository, 31 userService users.UserService, 32 db *sql.DB, 33) *PostEventConsumer { 34 return &PostEventConsumer{ 35 postRepo: postRepo, 36 communityRepo: communityRepo, 37 userService: userService, 38 db: db, 39 } 40} 41 42// HandleEvent processes a Jetstream event for post records 43// Currently only handles CREATE operations - UPDATE/DELETE deferred until those features exist 44func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 45 // We only care about commit events for post records 46 if event.Kind != "commit" || event.Commit == nil { 47 return nil 48 } 49 50 commit := event.Commit 51 52 // Only handle post record creation for now 53 // UPDATE and DELETE will be added when we implement those features 54 if commit.Collection == "social.coves.community.post" && commit.Operation == "create" { 55 return c.createPost(ctx, event.Did, commit) 56 } 57 58 // Silently ignore other operations (update, delete) and other collections 59 return nil 60} 61 62// createPost indexes a new post from the firehose 63func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error { 64 if commit.Record == nil { 65 return fmt.Errorf("post create event missing record data") 66 } 67 68 // Parse the post record 69 postRecord, err := parsePostRecord(commit.Record) 70 if err != nil { 71 return fmt.Errorf("failed to parse post record: %w", err) 72 } 73 74 // SECURITY: Validate this is a legitimate post event 75 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil { 76 log.Printf("🚨 SECURITY: Rejecting post event: %v", err) 77 return err 78 } 79 80 // Build AT-URI for this post 81 // Format: at://community_did/social.coves.community.post/rkey 82 uri := fmt.Sprintf("at://%s/social.coves.community.post/%s", repoDID, commit.RKey) 83 84 // Parse timestamp from record 85 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt) 86 if err != nil { 87 // Fallback to current time if parsing fails 88 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 89 createdAt = time.Now() 90 } 91 92 // Build post entity 93 post := &posts.Post{ 94 URI: uri, 95 CID: commit.CID, 96 RKey: commit.RKey, 97 AuthorDID: postRecord.Author, 98 CommunityDID: postRecord.Community, 99 Title: postRecord.Title, 100 Content: postRecord.Content, 101 CreatedAt: createdAt, 102 IndexedAt: time.Now(), 103 // Stats remain at 0 (no votes yet) 104 UpvoteCount: 0, 105 DownvoteCount: 0, 106 Score: 0, 107 CommentCount: 0, 108 } 109 110 // Serialize JSON fields (facets, embed, labels) 111 if postRecord.Facets != nil { 112 facetsJSON, marshalErr := json.Marshal(postRecord.Facets) 113 if marshalErr == nil { 114 facetsStr := string(facetsJSON) 115 post.ContentFacets = &facetsStr 116 } 117 } 118 119 if postRecord.Embed != nil { 120 embedJSON, marshalErr := json.Marshal(postRecord.Embed) 121 if marshalErr == nil { 122 embedStr := string(embedJSON) 123 post.Embed = &embedStr 124 } 125 } 126 127 if postRecord.Labels != nil { 128 labelsJSON, marshalErr := json.Marshal(postRecord.Labels) 129 if marshalErr == nil { 130 labelsStr := string(labelsJSON) 131 post.ContentLabels = &labelsStr 132 } 133 } 134 135 // Atomically: Index post + Reconcile comment count for out-of-order arrivals 136 if err := c.indexPostAndReconcileCounts(ctx, post); err != nil { 137 return fmt.Errorf("failed to index post and reconcile counts: %w", err) 138 } 139 140 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)", 141 uri, post.AuthorDID, post.CommunityDID, commit.RKey) 142 return nil 143} 144 145// indexPostAndReconcileCounts atomically indexes a post and reconciles comment counts 146// This fixes the race condition where comments arrive before their parent post 147func (c *PostEventConsumer) indexPostAndReconcileCounts(ctx context.Context, post *posts.Post) error { 148 tx, err := c.db.BeginTx(ctx, nil) 149 if err != nil { 150 return fmt.Errorf("failed to begin transaction: %w", err) 151 } 152 defer func() { 153 if rollbackErr := tx.Rollback(); rollbackErr != nil && rollbackErr != sql.ErrTxDone { 154 log.Printf("Failed to rollback transaction: %v", rollbackErr) 155 } 156 }() 157 158 // 1. Insert the post (idempotent with RETURNING clause) 159 var facetsJSON, embedJSON, labelsJSON sql.NullString 160 161 if post.ContentFacets != nil { 162 facetsJSON.String = *post.ContentFacets 163 facetsJSON.Valid = true 164 } 165 166 if post.Embed != nil { 167 embedJSON.String = *post.Embed 168 embedJSON.Valid = true 169 } 170 171 if post.ContentLabels != nil { 172 labelsJSON.String = *post.ContentLabels 173 labelsJSON.Valid = true 174 } 175 176 insertQuery := ` 177 INSERT INTO posts ( 178 uri, cid, rkey, author_did, community_did, 179 title, content, content_facets, embed, content_labels, 180 created_at, indexed_at 181 ) VALUES ( 182 $1, $2, $3, $4, $5, 183 $6, $7, $8, $9, $10, 184 $11, NOW() 185 ) 186 ON CONFLICT (uri) DO NOTHING 187 RETURNING id 188 ` 189 190 var postID int64 191 insertErr := tx.QueryRowContext( 192 ctx, insertQuery, 193 post.URI, post.CID, post.RKey, post.AuthorDID, post.CommunityDID, 194 post.Title, post.Content, facetsJSON, embedJSON, labelsJSON, 195 post.CreatedAt, 196 ).Scan(&postID) 197 198 // If no rows returned, post already exists (idempotent - OK for Jetstream replays) 199 if insertErr == sql.ErrNoRows { 200 log.Printf("Post already indexed: %s (idempotent)", post.URI) 201 if commitErr := tx.Commit(); commitErr != nil { 202 return fmt.Errorf("failed to commit transaction: %w", commitErr) 203 } 204 return nil 205 } 206 207 if insertErr != nil { 208 return fmt.Errorf("failed to insert post: %w", insertErr) 209 } 210 211 // 2. Reconcile comment_count for this newly inserted post 212 // In case any comments arrived out-of-order before this post was indexed 213 // This is the CRITICAL FIX for the race condition identified in the PR review 214 reconcileQuery := ` 215 UPDATE posts 216 SET comment_count = ( 217 SELECT COUNT(*) 218 FROM comments c 219 WHERE c.parent_uri = $1 AND c.deleted_at IS NULL 220 ) 221 WHERE id = $2 222 ` 223 _, reconcileErr := tx.ExecContext(ctx, reconcileQuery, post.URI, postID) 224 if reconcileErr != nil { 225 log.Printf("Warning: Failed to reconcile comment_count for %s: %v", post.URI, reconcileErr) 226 // Continue anyway - this is a best-effort reconciliation 227 } 228 229 // Commit transaction 230 if err := tx.Commit(); err != nil { 231 return fmt.Errorf("failed to commit transaction: %w", err) 232 } 233 234 return nil 235} 236 237// validatePostEvent performs security validation on post events 238// This prevents malicious actors from indexing fake posts 239func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error { 240 // CRITICAL SECURITY CHECK: 241 // Posts MUST come from community repositories, not user repositories 242 // This prevents users from creating posts that appear to be from communities they don't control 243 // 244 // Example attack prevented: 245 // - User creates post in their own repo (at://user_did/social.coves.community.post/xyz) 246 // - Claims it's for community X (community field = community_did) 247 // - Without this check, fake post would be indexed 248 // 249 // With this check: 250 // - We verify event.Did (repo owner) == post.community (claimed community) 251 // - Reject if mismatch 252 if repoDID != post.Community { 253 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos", 254 repoDID, post.Community) 255 } 256 257 // CRITICAL: Verify community exists in AppView 258 // Posts MUST reference valid communities (enforced by FK constraint) 259 // If community isn't indexed yet, we must reject the post 260 // Jetstream will replay events, so the post will be indexed once community is ready 261 _, err := c.communityRepo.GetByDID(ctx, post.Community) 262 if err != nil { 263 if communities.IsNotFound(err) { 264 // Reject - community must be indexed before posts 265 // This maintains referential integrity and prevents orphaned posts 266 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community) 267 } 268 // Database error or other issue 269 return fmt.Errorf("failed to verify community exists: %w", err) 270 } 271 272 // CRITICAL: Verify author exists in AppView 273 // Every post MUST have a valid author (enforced by FK constraint) 274 // Even though posts live in community repos, they belong to specific authors 275 // If author isn't indexed yet, we must reject the post 276 _, err = c.userService.GetUserByDID(ctx, post.Author) 277 if err != nil { 278 // Check if it's a "not found" error using string matching 279 // (users package doesn't export IsNotFound) 280 if err.Error() == "user not found" || strings.Contains(err.Error(), "not found") { 281 // Reject - author must be indexed before posts 282 // This maintains referential integrity and prevents orphaned posts 283 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author) 284 } 285 // Database error or other issue 286 return fmt.Errorf("failed to verify author exists: %w", err) 287 } 288 289 return nil 290} 291 292// PostRecordFromJetstream represents a post record as received from Jetstream 293// Matches the structure written to PDS via social.coves.community.post 294type PostRecordFromJetstream struct { 295 OriginalAuthor interface{} `json:"originalAuthor,omitempty"` 296 FederatedFrom interface{} `json:"federatedFrom,omitempty"` 297 Location interface{} `json:"location,omitempty"` 298 Title *string `json:"title,omitempty"` 299 Content *string `json:"content,omitempty"` 300 Embed map[string]interface{} `json:"embed,omitempty"` 301 Labels *posts.SelfLabels `json:"labels,omitempty"` 302 Type string `json:"$type"` 303 Community string `json:"community"` 304 Author string `json:"author"` 305 CreatedAt string `json:"createdAt"` 306 Facets []interface{} `json:"facets,omitempty"` 307} 308 309// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream 310func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) { 311 // Marshal to JSON and back to ensure proper type conversion 312 recordJSON, err := json.Marshal(record) 313 if err != nil { 314 return nil, fmt.Errorf("failed to marshal record: %w", err) 315 } 316 317 var post PostRecordFromJetstream 318 if err := json.Unmarshal(recordJSON, &post); err != nil { 319 return nil, fmt.Errorf("failed to unmarshal post record: %w", err) 320 } 321 322 // Validate required fields 323 if post.Community == "" { 324 return nil, fmt.Errorf("post record missing community field") 325 } 326 if post.Author == "" { 327 return nil, fmt.Errorf("post record missing author field") 328 } 329 if post.CreatedAt == "" { 330 return nil, fmt.Errorf("post record missing createdAt field") 331 } 332 333 return &post, nil 334}