A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/core/communities" 5 "Coves/internal/core/posts" 6 "Coves/internal/core/users" 7 "context" 8 "encoding/json" 9 "fmt" 10 "log" 11 "strings" 12 "time" 13) 14 15// PostEventConsumer consumes post-related events from Jetstream 16// Currently handles only CREATE operations for social.coves.post.record 17// UPDATE and DELETE handlers will be added when those features are implemented 18type PostEventConsumer struct { 19 postRepo posts.Repository 20 communityRepo communities.Repository 21 userService users.UserService 22} 23 24// NewPostEventConsumer creates a new Jetstream consumer for post events 25func NewPostEventConsumer( 26 postRepo posts.Repository, 27 communityRepo communities.Repository, 28 userService users.UserService, 29) *PostEventConsumer { 30 return &PostEventConsumer{ 31 postRepo: postRepo, 32 communityRepo: communityRepo, 33 userService: userService, 34 } 35} 36 37// HandleEvent processes a Jetstream event for post records 38// Currently only handles CREATE operations - UPDATE/DELETE deferred until those features exist 39func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error { 40 // We only care about commit events for post records 41 if event.Kind != "commit" || event.Commit == nil { 42 return nil 43 } 44 45 commit := event.Commit 46 47 // Only handle post record creation for now 48 // UPDATE and DELETE will be added when we implement those features 49 if commit.Collection == "social.coves.post.record" && commit.Operation == "create" { 50 return c.createPost(ctx, event.Did, commit) 51 } 52 53 // Silently ignore other operations (update, delete) and other collections 54 return nil 55} 56 57// createPost indexes a new post from the firehose 58func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error { 59 if commit.Record == nil { 60 return fmt.Errorf("post create event missing record data") 61 } 62 63 // Parse the post record 64 postRecord, err := parsePostRecord(commit.Record) 65 if err != nil { 66 return fmt.Errorf("failed to parse post record: %w", err) 67 } 68 69 // SECURITY: Validate this is a legitimate post event 70 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil { 71 log.Printf("🚨 SECURITY: Rejecting post event: %v", err) 72 return err 73 } 74 75 // Build AT-URI for this post 76 // Format: at://community_did/social.coves.post.record/rkey 77 uri := fmt.Sprintf("at://%s/social.coves.post.record/%s", repoDID, commit.RKey) 78 79 // Parse timestamp from record 80 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt) 81 if err != nil { 82 // Fallback to current time if parsing fails 83 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err) 84 createdAt = time.Now() 85 } 86 87 // Build post entity 88 post := &posts.Post{ 89 URI: uri, 90 CID: commit.CID, 91 RKey: commit.RKey, 92 AuthorDID: postRecord.Author, 93 CommunityDID: postRecord.Community, 94 Title: postRecord.Title, 95 Content: postRecord.Content, 96 CreatedAt: createdAt, 97 IndexedAt: time.Now(), 98 // Stats remain at 0 (no votes yet) 99 UpvoteCount: 0, 100 DownvoteCount: 0, 101 Score: 0, 102 CommentCount: 0, 103 } 104 105 // Serialize JSON fields (facets, embed, labels) 106 if postRecord.Facets != nil { 107 facetsJSON, marshalErr := json.Marshal(postRecord.Facets) 108 if marshalErr == nil { 109 facetsStr := string(facetsJSON) 110 post.ContentFacets = &facetsStr 111 } 112 } 113 114 if postRecord.Embed != nil { 115 embedJSON, marshalErr := json.Marshal(postRecord.Embed) 116 if marshalErr == nil { 117 embedStr := string(embedJSON) 118 post.Embed = &embedStr 119 } 120 } 121 122 if len(postRecord.ContentLabels) > 0 { 123 labelsJSON, marshalErr := json.Marshal(postRecord.ContentLabels) 124 if marshalErr == nil { 125 labelsStr := string(labelsJSON) 126 post.ContentLabels = &labelsStr 127 } 128 } 129 130 // Index in AppView database (idempotent - safe for Jetstream replays) 131 err = c.postRepo.Create(ctx, post) 132 if err != nil { 133 // Check if it already exists (idempotency) 134 if posts.IsConflict(err) { 135 log.Printf("Post already indexed: %s", uri) 136 return nil 137 } 138 return fmt.Errorf("failed to index post: %w", err) 139 } 140 141 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)", 142 uri, post.AuthorDID, post.CommunityDID, commit.RKey) 143 return nil 144} 145 146// validatePostEvent performs security validation on post events 147// This prevents malicious actors from indexing fake posts 148func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error { 149 // CRITICAL SECURITY CHECK: 150 // Posts MUST come from community repositories, not user repositories 151 // This prevents users from creating posts that appear to be from communities they don't control 152 // 153 // Example attack prevented: 154 // - User creates post in their own repo (at://user_did/social.coves.post.record/xyz) 155 // - Claims it's for community X (community field = community_did) 156 // - Without this check, fake post would be indexed 157 // 158 // With this check: 159 // - We verify event.Did (repo owner) == post.community (claimed community) 160 // - Reject if mismatch 161 if repoDID != post.Community { 162 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos", 163 repoDID, post.Community) 164 } 165 166 // CRITICAL: Verify community exists in AppView 167 // Posts MUST reference valid communities (enforced by FK constraint) 168 // If community isn't indexed yet, we must reject the post 169 // Jetstream will replay events, so the post will be indexed once community is ready 170 _, err := c.communityRepo.GetByDID(ctx, post.Community) 171 if err != nil { 172 if communities.IsNotFound(err) { 173 // Reject - community must be indexed before posts 174 // This maintains referential integrity and prevents orphaned posts 175 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community) 176 } 177 // Database error or other issue 178 return fmt.Errorf("failed to verify community exists: %w", err) 179 } 180 181 // CRITICAL: Verify author exists in AppView 182 // Every post MUST have a valid author (enforced by FK constraint) 183 // Even though posts live in community repos, they belong to specific authors 184 // If author isn't indexed yet, we must reject the post 185 _, err = c.userService.GetUserByDID(ctx, post.Author) 186 if err != nil { 187 // Check if it's a "not found" error using string matching 188 // (users package doesn't export IsNotFound) 189 if err.Error() == "user not found" || strings.Contains(err.Error(), "not found") { 190 // Reject - author must be indexed before posts 191 // This maintains referential integrity and prevents orphaned posts 192 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author) 193 } 194 // Database error or other issue 195 return fmt.Errorf("failed to verify author exists: %w", err) 196 } 197 198 return nil 199} 200 201// PostRecordFromJetstream represents a post record as received from Jetstream 202// Matches the structure written to PDS via social.coves.post.record 203type PostRecordFromJetstream struct { 204 OriginalAuthor interface{} `json:"originalAuthor,omitempty"` 205 FederatedFrom interface{} `json:"federatedFrom,omitempty"` 206 Location interface{} `json:"location,omitempty"` 207 Title *string `json:"title,omitempty"` 208 Content *string `json:"content,omitempty"` 209 Embed map[string]interface{} `json:"embed,omitempty"` 210 Type string `json:"$type"` 211 Community string `json:"community"` 212 Author string `json:"author"` 213 CreatedAt string `json:"createdAt"` 214 Facets []interface{} `json:"facets,omitempty"` 215 ContentLabels []string `json:"contentLabels,omitempty"` 216} 217 218// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream 219func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) { 220 // Marshal to JSON and back to ensure proper type conversion 221 recordJSON, err := json.Marshal(record) 222 if err != nil { 223 return nil, fmt.Errorf("failed to marshal record: %w", err) 224 } 225 226 var post PostRecordFromJetstream 227 if err := json.Unmarshal(recordJSON, &post); err != nil { 228 return nil, fmt.Errorf("failed to unmarshal post record: %w", err) 229 } 230 231 // Validate required fields 232 if post.Community == "" { 233 return nil, fmt.Errorf("post record missing community field") 234 } 235 if post.Author == "" { 236 return nil, fmt.Errorf("post record missing author field") 237 } 238 if post.CreatedAt == "" { 239 return nil, fmt.Errorf("post record missing createdAt field") 240 } 241 242 return &post, nil 243}