A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "Coves/internal/atproto/identity"
11 "Coves/internal/core/users"
12 "github.com/gorilla/websocket"
13)
14
15// JetstreamEvent represents an event from the Jetstream firehose
16// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream
17type JetstreamEvent struct {
18 Did string `json:"did"`
19 TimeUS int64 `json:"time_us"`
20 Kind string `json:"kind"` // "account", "commit", "identity"
21 Account *AccountEvent `json:"account,omitempty"`
22 Identity *IdentityEvent `json:"identity,omitempty"`
23 Commit *CommitEvent `json:"commit,omitempty"`
24}
25
26type AccountEvent struct {
27 Active bool `json:"active"`
28 Did string `json:"did"`
29 Seq int64 `json:"seq"`
30 Time string `json:"time"`
31}
32
33type IdentityEvent struct {
34 Did string `json:"did"`
35 Handle string `json:"handle"`
36 Seq int64 `json:"seq"`
37 Time string `json:"time"`
38}
39
40// CommitEvent represents a record commit from Jetstream
41type CommitEvent struct {
42 Rev string `json:"rev"`
43 Operation string `json:"operation"` // "create", "update", "delete"
44 Collection string `json:"collection"`
45 RKey string `json:"rkey"`
46 Record map[string]interface{} `json:"record,omitempty"`
47 CID string `json:"cid,omitempty"`
48}
49
50// UserEventConsumer consumes user-related events from Jetstream
51type UserEventConsumer struct {
52 userService users.UserService
53 identityResolver identity.Resolver
54 wsURL string
55 pdsFilter string // Optional: only index users from specific PDS
56}
57
58// NewUserEventConsumer creates a new Jetstream consumer for user events
59func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL string, pdsFilter string) *UserEventConsumer {
60 return &UserEventConsumer{
61 userService: userService,
62 identityResolver: identityResolver,
63 wsURL: wsURL,
64 pdsFilter: pdsFilter,
65 }
66}
67
68// Start begins consuming events from Jetstream
69// Runs indefinitely, reconnecting on errors
70func (c *UserEventConsumer) Start(ctx context.Context) error {
71 log.Printf("Starting Jetstream user consumer: %s", c.wsURL)
72
73 for {
74 select {
75 case <-ctx.Done():
76 log.Println("Jetstream consumer shutting down")
77 return ctx.Err()
78 default:
79 if err := c.connect(ctx); err != nil {
80 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err)
81 time.Sleep(5 * time.Second)
82 continue
83 }
84 }
85 }
86}
87
88// connect establishes WebSocket connection and processes events
89func (c *UserEventConsumer) connect(ctx context.Context) error {
90 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
91 if err != nil {
92 return fmt.Errorf("failed to connect to Jetstream: %w", err)
93 }
94 defer conn.Close()
95
96 log.Println("Connected to Jetstream")
97
98 // Set read deadline to detect connection issues
99 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
100
101 // Set pong handler to keep connection alive
102 conn.SetPongHandler(func(string) error {
103 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
104 return nil
105 })
106
107 // Start ping ticker
108 ticker := time.NewTicker(30 * time.Second)
109 defer ticker.Stop()
110
111 done := make(chan struct{})
112
113 // Goroutine to send pings
114 // TODO: Fix race condition - multiple goroutines can call close(done) concurrently
115 // Use sync.Once to ensure close(done) is called exactly once
116 // See PR review issue #4
117 go func() {
118 for {
119 select {
120 case <-ticker.C:
121 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
122 log.Printf("Ping error: %v", err)
123 close(done)
124 return
125 }
126 case <-done:
127 return
128 case <-ctx.Done():
129 return
130 }
131 }
132 }()
133
134 // Read messages
135 for {
136 select {
137 case <-ctx.Done():
138 return ctx.Err()
139 case <-done:
140 return fmt.Errorf("connection closed")
141 default:
142 _, message, err := conn.ReadMessage()
143 if err != nil {
144 close(done)
145 return fmt.Errorf("read error: %w", err)
146 }
147
148 // Reset read deadline on successful read
149 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
150
151 if err := c.handleEvent(ctx, message); err != nil {
152 log.Printf("Error handling event: %v", err)
153 // Continue processing other events
154 }
155 }
156 }
157}
158
159// handleEvent processes a single Jetstream event
160func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error {
161 var event JetstreamEvent
162 if err := json.Unmarshal(data, &event); err != nil {
163 return fmt.Errorf("failed to parse event: %w", err)
164 }
165
166 // We're interested in identity events (handle updates) and account events (new users)
167 switch event.Kind {
168 case "identity":
169 return c.handleIdentityEvent(ctx, &event)
170 case "account":
171 return c.handleAccountEvent(ctx, &event)
172 default:
173 // Ignore other event types (commits, etc.)
174 return nil
175 }
176}
177
178// HandleIdentityEventPublic is a public wrapper for testing
179func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error {
180 return c.handleIdentityEvent(ctx, event)
181}
182
183// handleIdentityEvent processes identity events (handle changes)
184func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error {
185 if event.Identity == nil {
186 return fmt.Errorf("identity event missing identity data")
187 }
188
189 did := event.Identity.Did
190 handle := event.Identity.Handle
191
192 if did == "" || handle == "" {
193 return fmt.Errorf("identity event missing did or handle")
194 }
195
196 log.Printf("Identity event: %s → %s", did, handle)
197
198 // Get existing user to check if handle changed
199 existingUser, err := c.userService.GetUserByDID(ctx, did)
200 if err != nil {
201 // User doesn't exist - create new user
202 pdsURL := "https://bsky.social" // Default Bluesky PDS
203 // TODO: Resolve PDS URL from DID document via PLC directory
204
205 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{
206 DID: did,
207 Handle: handle,
208 PDSURL: pdsURL,
209 })
210
211 if createErr != nil && !isDuplicateError(createErr) {
212 return fmt.Errorf("failed to create user: %w", createErr)
213 }
214
215 log.Printf("Indexed new user: %s (%s)", handle, did)
216 return nil
217 }
218
219 // User exists - check if handle changed
220 if existingUser.Handle != handle {
221 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did)
222
223 // CRITICAL: Update database FIRST, then purge cache
224 // This prevents race condition where cache gets refilled with stale data
225 _, updateErr := c.userService.UpdateHandle(ctx, did, handle)
226 if updateErr != nil {
227 return fmt.Errorf("failed to update handle: %w", updateErr)
228 }
229
230 // CRITICAL: Purge BOTH old handle and DID from cache
231 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed)
232 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil {
233 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr)
234 }
235
236 // DID: did:plc:abc123 → alice.bsky.social (must be removed)
237 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil {
238 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr)
239 }
240
241 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle)
242 } else {
243 log.Printf("Handle unchanged for %s (%s)", handle, did)
244 }
245
246 return nil
247}
248
249// handleAccountEvent processes account events (account creation/updates)
250func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error {
251 if event.Account == nil {
252 return fmt.Errorf("account event missing account data")
253 }
254
255 did := event.Account.Did
256 if did == "" {
257 return fmt.Errorf("account event missing did")
258 }
259
260 // Account events don't include handle, so we can't index yet
261 // We'll wait for the corresponding identity event
262 log.Printf("Account event for %s (waiting for identity event)", did)
263 return nil
264}
265
266// isDuplicateError checks if error is due to duplicate DID/handle
267func isDuplicateError(err error) bool {
268 if err == nil {
269 return false
270 }
271 errStr := err.Error()
272 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate")
273}
274
275func contains(s, substr string) bool {
276 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr))
277}
278
279func anySubstring(s, substr string) bool {
280 for i := 0; i <= len(s)-len(substr); i++ {
281 if s[i:i+len(substr)] == substr {
282 return true
283 }
284 }
285 return false
286}