A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "Coves/internal/atproto/identity"
11 "Coves/internal/core/users"
12 "github.com/gorilla/websocket"
13)
14
15// JetstreamEvent represents an event from the Jetstream firehose
16// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream
17type JetstreamEvent struct {
18 Did string `json:"did"`
19 TimeUS int64 `json:"time_us"`
20 Kind string `json:"kind"` // "account", "commit", "identity"
21 Account *AccountEvent `json:"account,omitempty"`
22 Identity *IdentityEvent `json:"identity,omitempty"`
23}
24
25type AccountEvent struct {
26 Active bool `json:"active"`
27 Did string `json:"did"`
28 Seq int64 `json:"seq"`
29 Time string `json:"time"`
30}
31
32type IdentityEvent struct {
33 Did string `json:"did"`
34 Handle string `json:"handle"`
35 Seq int64 `json:"seq"`
36 Time string `json:"time"`
37}
38
39// UserEventConsumer consumes user-related events from Jetstream
40type UserEventConsumer struct {
41 userService users.UserService
42 identityResolver identity.Resolver
43 wsURL string
44 pdsFilter string // Optional: only index users from specific PDS
45}
46
47// NewUserEventConsumer creates a new Jetstream consumer for user events
48func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL string, pdsFilter string) *UserEventConsumer {
49 return &UserEventConsumer{
50 userService: userService,
51 identityResolver: identityResolver,
52 wsURL: wsURL,
53 pdsFilter: pdsFilter,
54 }
55}
56
57// Start begins consuming events from Jetstream
58// Runs indefinitely, reconnecting on errors
59func (c *UserEventConsumer) Start(ctx context.Context) error {
60 log.Printf("Starting Jetstream user consumer: %s", c.wsURL)
61
62 for {
63 select {
64 case <-ctx.Done():
65 log.Println("Jetstream consumer shutting down")
66 return ctx.Err()
67 default:
68 if err := c.connect(ctx); err != nil {
69 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err)
70 time.Sleep(5 * time.Second)
71 continue
72 }
73 }
74 }
75}
76
77// connect establishes WebSocket connection and processes events
78func (c *UserEventConsumer) connect(ctx context.Context) error {
79 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
80 if err != nil {
81 return fmt.Errorf("failed to connect to Jetstream: %w", err)
82 }
83 defer conn.Close()
84
85 log.Println("Connected to Jetstream")
86
87 // Set read deadline to detect connection issues
88 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
89
90 // Set pong handler to keep connection alive
91 conn.SetPongHandler(func(string) error {
92 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
93 return nil
94 })
95
96 // Start ping ticker
97 ticker := time.NewTicker(30 * time.Second)
98 defer ticker.Stop()
99
100 done := make(chan struct{})
101
102 // Goroutine to send pings
103 // TODO: Fix race condition - multiple goroutines can call close(done) concurrently
104 // Use sync.Once to ensure close(done) is called exactly once
105 // See PR review issue #4
106 go func() {
107 for {
108 select {
109 case <-ticker.C:
110 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
111 log.Printf("Ping error: %v", err)
112 close(done)
113 return
114 }
115 case <-done:
116 return
117 case <-ctx.Done():
118 return
119 }
120 }
121 }()
122
123 // Read messages
124 for {
125 select {
126 case <-ctx.Done():
127 return ctx.Err()
128 case <-done:
129 return fmt.Errorf("connection closed")
130 default:
131 _, message, err := conn.ReadMessage()
132 if err != nil {
133 close(done)
134 return fmt.Errorf("read error: %w", err)
135 }
136
137 // Reset read deadline on successful read
138 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
139
140 if err := c.handleEvent(ctx, message); err != nil {
141 log.Printf("Error handling event: %v", err)
142 // Continue processing other events
143 }
144 }
145 }
146}
147
148// handleEvent processes a single Jetstream event
149func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error {
150 var event JetstreamEvent
151 if err := json.Unmarshal(data, &event); err != nil {
152 return fmt.Errorf("failed to parse event: %w", err)
153 }
154
155 // We're interested in identity events (handle updates) and account events (new users)
156 switch event.Kind {
157 case "identity":
158 return c.handleIdentityEvent(ctx, &event)
159 case "account":
160 return c.handleAccountEvent(ctx, &event)
161 default:
162 // Ignore other event types (commits, etc.)
163 return nil
164 }
165}
166
167// HandleIdentityEventPublic is a public wrapper for testing
168func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error {
169 return c.handleIdentityEvent(ctx, event)
170}
171
172// handleIdentityEvent processes identity events (handle changes)
173func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error {
174 if event.Identity == nil {
175 return fmt.Errorf("identity event missing identity data")
176 }
177
178 did := event.Identity.Did
179 handle := event.Identity.Handle
180
181 if did == "" || handle == "" {
182 return fmt.Errorf("identity event missing did or handle")
183 }
184
185 log.Printf("Identity event: %s → %s", did, handle)
186
187 // Get existing user to check if handle changed
188 existingUser, err := c.userService.GetUserByDID(ctx, did)
189 if err != nil {
190 // User doesn't exist - create new user
191 pdsURL := "https://bsky.social" // Default Bluesky PDS
192 // TODO: Resolve PDS URL from DID document via PLC directory
193
194 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{
195 DID: did,
196 Handle: handle,
197 PDSURL: pdsURL,
198 })
199
200 if createErr != nil && !isDuplicateError(createErr) {
201 return fmt.Errorf("failed to create user: %w", createErr)
202 }
203
204 log.Printf("Indexed new user: %s (%s)", handle, did)
205 return nil
206 }
207
208 // User exists - check if handle changed
209 if existingUser.Handle != handle {
210 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did)
211
212 // CRITICAL: Update database FIRST, then purge cache
213 // This prevents race condition where cache gets refilled with stale data
214 _, updateErr := c.userService.UpdateHandle(ctx, did, handle)
215 if updateErr != nil {
216 return fmt.Errorf("failed to update handle: %w", updateErr)
217 }
218
219 // CRITICAL: Purge BOTH old handle and DID from cache
220 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed)
221 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil {
222 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr)
223 }
224
225 // DID: did:plc:abc123 → alice.bsky.social (must be removed)
226 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil {
227 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr)
228 }
229
230 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle)
231 } else {
232 log.Printf("Handle unchanged for %s (%s)", handle, did)
233 }
234
235 return nil
236}
237
238// handleAccountEvent processes account events (account creation/updates)
239func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error {
240 if event.Account == nil {
241 return fmt.Errorf("account event missing account data")
242 }
243
244 did := event.Account.Did
245 if did == "" {
246 return fmt.Errorf("account event missing did")
247 }
248
249 // Account events don't include handle, so we can't index yet
250 // We'll wait for the corresponding identity event
251 log.Printf("Account event for %s (waiting for identity event)", did)
252 return nil
253}
254
255// isDuplicateError checks if error is due to duplicate DID/handle
256func isDuplicateError(err error) bool {
257 if err == nil {
258 return false
259 }
260 errStr := err.Error()
261 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate")
262}
263
264func contains(s, substr string) bool {
265 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr))
266}
267
268func anySubstring(s, substr string) bool {
269 for i := 0; i <= len(s)-len(substr); i++ {
270 if s[i:i+len(substr)] == substr {
271 return true
272 }
273 }
274 return false
275}