A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/core/communities"
5 "Coves/internal/core/posts"
6 "Coves/internal/core/users"
7 "context"
8 "encoding/json"
9 "fmt"
10 "log"
11 "strings"
12 "time"
13)
14
15// PostEventConsumer consumes post-related events from Jetstream
16// Currently handles only CREATE operations for social.coves.post.record
17// UPDATE and DELETE handlers will be added when those features are implemented
18type PostEventConsumer struct {
19 postRepo posts.Repository
20 communityRepo communities.Repository
21 userService users.UserService
22}
23
24// NewPostEventConsumer creates a new Jetstream consumer for post events
25func NewPostEventConsumer(
26 postRepo posts.Repository,
27 communityRepo communities.Repository,
28 userService users.UserService,
29) *PostEventConsumer {
30 return &PostEventConsumer{
31 postRepo: postRepo,
32 communityRepo: communityRepo,
33 userService: userService,
34 }
35}
36
37// HandleEvent processes a Jetstream event for post records
38// Currently only handles CREATE operations - UPDATE/DELETE deferred until those features exist
39func (c *PostEventConsumer) HandleEvent(ctx context.Context, event *JetstreamEvent) error {
40 // We only care about commit events for post records
41 if event.Kind != "commit" || event.Commit == nil {
42 return nil
43 }
44
45 commit := event.Commit
46
47 // Only handle post record creation for now
48 // UPDATE and DELETE will be added when we implement those features
49 if commit.Collection == "social.coves.post.record" && commit.Operation == "create" {
50 return c.createPost(ctx, event.Did, commit)
51 }
52
53 // Silently ignore other operations (update, delete) and other collections
54 return nil
55}
56
57// createPost indexes a new post from the firehose
58func (c *PostEventConsumer) createPost(ctx context.Context, repoDID string, commit *CommitEvent) error {
59 if commit.Record == nil {
60 return fmt.Errorf("post create event missing record data")
61 }
62
63 // Parse the post record
64 postRecord, err := parsePostRecord(commit.Record)
65 if err != nil {
66 return fmt.Errorf("failed to parse post record: %w", err)
67 }
68
69 // SECURITY: Validate this is a legitimate post event
70 if err := c.validatePostEvent(ctx, repoDID, postRecord); err != nil {
71 log.Printf("🚨 SECURITY: Rejecting post event: %v", err)
72 return err
73 }
74
75 // Build AT-URI for this post
76 // Format: at://community_did/social.coves.post.record/rkey
77 uri := fmt.Sprintf("at://%s/social.coves.post.record/%s", repoDID, commit.RKey)
78
79 // Parse timestamp from record
80 createdAt, err := time.Parse(time.RFC3339, postRecord.CreatedAt)
81 if err != nil {
82 // Fallback to current time if parsing fails
83 log.Printf("Warning: Failed to parse createdAt timestamp, using current time: %v", err)
84 createdAt = time.Now()
85 }
86
87 // Build post entity
88 post := &posts.Post{
89 URI: uri,
90 CID: commit.CID,
91 RKey: commit.RKey,
92 AuthorDID: postRecord.Author,
93 CommunityDID: postRecord.Community,
94 Title: postRecord.Title,
95 Content: postRecord.Content,
96 CreatedAt: createdAt,
97 IndexedAt: time.Now(),
98 // Stats remain at 0 (no votes yet)
99 UpvoteCount: 0,
100 DownvoteCount: 0,
101 Score: 0,
102 CommentCount: 0,
103 }
104
105 // Serialize JSON fields (facets, embed, labels)
106 if postRecord.Facets != nil {
107 facetsJSON, marshalErr := json.Marshal(postRecord.Facets)
108 if marshalErr == nil {
109 facetsStr := string(facetsJSON)
110 post.ContentFacets = &facetsStr
111 }
112 }
113
114 if postRecord.Embed != nil {
115 embedJSON, marshalErr := json.Marshal(postRecord.Embed)
116 if marshalErr == nil {
117 embedStr := string(embedJSON)
118 post.Embed = &embedStr
119 }
120 }
121
122 if len(postRecord.ContentLabels) > 0 {
123 labelsJSON, marshalErr := json.Marshal(postRecord.ContentLabels)
124 if marshalErr == nil {
125 labelsStr := string(labelsJSON)
126 post.ContentLabels = &labelsStr
127 }
128 }
129
130 // Index in AppView database (idempotent - safe for Jetstream replays)
131 err = c.postRepo.Create(ctx, post)
132 if err != nil {
133 // Check if it already exists (idempotency)
134 if posts.IsConflict(err) {
135 log.Printf("Post already indexed: %s", uri)
136 return nil
137 }
138 return fmt.Errorf("failed to index post: %w", err)
139 }
140
141 log.Printf("✓ Indexed post: %s (author: %s, community: %s, rkey: %s)",
142 uri, post.AuthorDID, post.CommunityDID, commit.RKey)
143 return nil
144}
145
146// validatePostEvent performs security validation on post events
147// This prevents malicious actors from indexing fake posts
148func (c *PostEventConsumer) validatePostEvent(ctx context.Context, repoDID string, post *PostRecordFromJetstream) error {
149 // CRITICAL SECURITY CHECK:
150 // Posts MUST come from community repositories, not user repositories
151 // This prevents users from creating posts that appear to be from communities they don't control
152 //
153 // Example attack prevented:
154 // - User creates post in their own repo (at://user_did/social.coves.post.record/xyz)
155 // - Claims it's for community X (community field = community_did)
156 // - Without this check, fake post would be indexed
157 //
158 // With this check:
159 // - We verify event.Did (repo owner) == post.community (claimed community)
160 // - Reject if mismatch
161 if repoDID != post.Community {
162 return fmt.Errorf("repository DID (%s) doesn't match community DID (%s) - posts must come from community repos",
163 repoDID, post.Community)
164 }
165
166 // CRITICAL: Verify community exists in AppView
167 // Posts MUST reference valid communities (enforced by FK constraint)
168 // If community isn't indexed yet, we must reject the post
169 // Jetstream will replay events, so the post will be indexed once community is ready
170 _, err := c.communityRepo.GetByDID(ctx, post.Community)
171 if err != nil {
172 if communities.IsNotFound(err) {
173 // Reject - community must be indexed before posts
174 // This maintains referential integrity and prevents orphaned posts
175 return fmt.Errorf("community not found: %s - cannot index post before community", post.Community)
176 }
177 // Database error or other issue
178 return fmt.Errorf("failed to verify community exists: %w", err)
179 }
180
181 // CRITICAL: Verify author exists in AppView
182 // Every post MUST have a valid author (enforced by FK constraint)
183 // Even though posts live in community repos, they belong to specific authors
184 // If author isn't indexed yet, we must reject the post
185 _, err = c.userService.GetUserByDID(ctx, post.Author)
186 if err != nil {
187 // Check if it's a "not found" error using string matching
188 // (users package doesn't export IsNotFound)
189 if err.Error() == "user not found" || strings.Contains(err.Error(), "not found") {
190 // Reject - author must be indexed before posts
191 // This maintains referential integrity and prevents orphaned posts
192 return fmt.Errorf("author not found: %s - cannot index post before author", post.Author)
193 }
194 // Database error or other issue
195 return fmt.Errorf("failed to verify author exists: %w", err)
196 }
197
198 return nil
199}
200
201// PostRecordFromJetstream represents a post record as received from Jetstream
202// Matches the structure written to PDS via social.coves.post.record
203type PostRecordFromJetstream struct {
204 OriginalAuthor interface{} `json:"originalAuthor,omitempty"`
205 FederatedFrom interface{} `json:"federatedFrom,omitempty"`
206 Location interface{} `json:"location,omitempty"`
207 Title *string `json:"title,omitempty"`
208 Content *string `json:"content,omitempty"`
209 Embed map[string]interface{} `json:"embed,omitempty"`
210 Type string `json:"$type"`
211 Community string `json:"community"`
212 Author string `json:"author"`
213 CreatedAt string `json:"createdAt"`
214 Facets []interface{} `json:"facets,omitempty"`
215 ContentLabels []string `json:"contentLabels,omitempty"`
216}
217
218// parsePostRecord converts a raw Jetstream record map to a PostRecordFromJetstream
219func parsePostRecord(record map[string]interface{}) (*PostRecordFromJetstream, error) {
220 // Marshal to JSON and back to ensure proper type conversion
221 recordJSON, err := json.Marshal(record)
222 if err != nil {
223 return nil, fmt.Errorf("failed to marshal record: %w", err)
224 }
225
226 var post PostRecordFromJetstream
227 if err := json.Unmarshal(recordJSON, &post); err != nil {
228 return nil, fmt.Errorf("failed to unmarshal post record: %w", err)
229 }
230
231 // Validate required fields
232 if post.Community == "" {
233 return nil, fmt.Errorf("post record missing community field")
234 }
235 if post.Author == "" {
236 return nil, fmt.Errorf("post record missing author field")
237 }
238 if post.CreatedAt == "" {
239 return nil, fmt.Errorf("post record missing createdAt field")
240 }
241
242 return &post, nil
243}