A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "sync"
9 "time"
10
11 "Coves/internal/atproto/identity"
12 "Coves/internal/core/users"
13
14 "github.com/gorilla/websocket"
15)
16
17// JetstreamEvent represents an event from the Jetstream firehose
18// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream
19type JetstreamEvent struct {
20 Account *AccountEvent `json:"account,omitempty"`
21 Identity *IdentityEvent `json:"identity,omitempty"`
22 Commit *CommitEvent `json:"commit,omitempty"`
23 Did string `json:"did"`
24 Kind string `json:"kind"`
25 TimeUS int64 `json:"time_us"`
26}
27
28type AccountEvent struct {
29 Did string `json:"did"`
30 Time string `json:"time"`
31 Seq int64 `json:"seq"`
32 Active bool `json:"active"`
33}
34
35type IdentityEvent struct {
36 Did string `json:"did"`
37 Handle string `json:"handle"`
38 Time string `json:"time"`
39 Seq int64 `json:"seq"`
40}
41
42// CommitEvent represents a record commit from Jetstream
43type CommitEvent struct {
44 Rev string `json:"rev"`
45 Operation string `json:"operation"` // "create", "update", "delete"
46 Collection string `json:"collection"`
47 RKey string `json:"rkey"`
48 Record map[string]interface{} `json:"record,omitempty"`
49 CID string `json:"cid,omitempty"`
50}
51
52// UserEventConsumer consumes user-related events from Jetstream
53type UserEventConsumer struct {
54 userService users.UserService
55 identityResolver identity.Resolver
56 wsURL string
57 pdsFilter string // Optional: only index users from specific PDS
58}
59
60// NewUserEventConsumer creates a new Jetstream consumer for user events
61func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string) *UserEventConsumer {
62 return &UserEventConsumer{
63 userService: userService,
64 identityResolver: identityResolver,
65 wsURL: wsURL,
66 pdsFilter: pdsFilter,
67 }
68}
69
70// Start begins consuming events from Jetstream
71// Runs indefinitely, reconnecting on errors
72func (c *UserEventConsumer) Start(ctx context.Context) error {
73 log.Printf("Starting Jetstream user consumer: %s", c.wsURL)
74
75 for {
76 select {
77 case <-ctx.Done():
78 log.Println("Jetstream consumer shutting down")
79 return ctx.Err()
80 default:
81 if err := c.connect(ctx); err != nil {
82 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err)
83 time.Sleep(5 * time.Second)
84 continue
85 }
86 }
87 }
88}
89
90// connect establishes WebSocket connection and processes events
91func (c *UserEventConsumer) connect(ctx context.Context) error {
92 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
93 if err != nil {
94 return fmt.Errorf("failed to connect to Jetstream: %w", err)
95 }
96 defer func() {
97 if err := conn.Close(); err != nil {
98 log.Printf("Failed to close WebSocket connection: %v", err)
99 }
100 }()
101
102 log.Println("Connected to Jetstream")
103
104 // Set read deadline to detect connection issues
105 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
106 log.Printf("Failed to set read deadline: %v", err)
107 }
108
109 // Set pong handler to keep connection alive
110 conn.SetPongHandler(func(string) error {
111 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
112 log.Printf("Failed to set read deadline in pong handler: %v", err)
113 }
114 return nil
115 })
116
117 // Start ping ticker
118 ticker := time.NewTicker(30 * time.Second)
119 defer ticker.Stop()
120
121 done := make(chan struct{})
122 var closeOnce sync.Once // Ensure done channel is only closed once
123
124 // Goroutine to send pings
125 go func() {
126 for {
127 select {
128 case <-ticker.C:
129 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
130 log.Printf("Ping error: %v", err)
131 closeOnce.Do(func() { close(done) })
132 return
133 }
134 case <-done:
135 return
136 case <-ctx.Done():
137 return
138 }
139 }
140 }()
141
142 // Read messages
143 for {
144 select {
145 case <-ctx.Done():
146 return ctx.Err()
147 case <-done:
148 return fmt.Errorf("connection closed")
149 default:
150 _, message, err := conn.ReadMessage()
151 if err != nil {
152 closeOnce.Do(func() { close(done) })
153 return fmt.Errorf("read error: %w", err)
154 }
155
156 // Reset read deadline on successful read
157 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
158 log.Printf("Failed to set read deadline: %v", err)
159 }
160
161 if err := c.handleEvent(ctx, message); err != nil {
162 log.Printf("Error handling event: %v", err)
163 // Continue processing other events
164 }
165 }
166 }
167}
168
169// handleEvent processes a single Jetstream event
170func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error {
171 var event JetstreamEvent
172 if err := json.Unmarshal(data, &event); err != nil {
173 return fmt.Errorf("failed to parse event: %w", err)
174 }
175
176 // We're interested in identity events (handle updates) and account events (new users)
177 switch event.Kind {
178 case "identity":
179 return c.handleIdentityEvent(ctx, &event)
180 case "account":
181 return c.handleAccountEvent(ctx, &event)
182 default:
183 // Ignore other event types (commits, etc.)
184 return nil
185 }
186}
187
188// HandleIdentityEventPublic is a public wrapper for testing
189func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error {
190 return c.handleIdentityEvent(ctx, event)
191}
192
193// handleIdentityEvent processes identity events (handle changes)
194func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error {
195 if event.Identity == nil {
196 return fmt.Errorf("identity event missing identity data")
197 }
198
199 did := event.Identity.Did
200 handle := event.Identity.Handle
201
202 if did == "" || handle == "" {
203 return fmt.Errorf("identity event missing did or handle")
204 }
205
206 log.Printf("Identity event: %s → %s", did, handle)
207
208 // Get existing user to check if handle changed
209 existingUser, err := c.userService.GetUserByDID(ctx, did)
210 if err != nil {
211 // User doesn't exist - create new user
212 pdsURL := "https://bsky.social" // Default Bluesky PDS
213 // TODO: Resolve PDS URL from DID document via PLC directory
214
215 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{
216 DID: did,
217 Handle: handle,
218 PDSURL: pdsURL,
219 })
220
221 if createErr != nil && !isDuplicateError(createErr) {
222 return fmt.Errorf("failed to create user: %w", createErr)
223 }
224
225 log.Printf("Indexed new user: %s (%s)", handle, did)
226 return nil
227 }
228
229 // User exists - check if handle changed
230 if existingUser.Handle != handle {
231 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did)
232
233 // CRITICAL: Update database FIRST, then purge cache
234 // This prevents race condition where cache gets refilled with stale data
235 _, updateErr := c.userService.UpdateHandle(ctx, did, handle)
236 if updateErr != nil {
237 return fmt.Errorf("failed to update handle: %w", updateErr)
238 }
239
240 // CRITICAL: Purge BOTH old handle and DID from cache
241 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed)
242 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil {
243 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr)
244 }
245
246 // DID: did:plc:abc123 → alice.bsky.social (must be removed)
247 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil {
248 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr)
249 }
250
251 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle)
252 } else {
253 log.Printf("Handle unchanged for %s (%s)", handle, did)
254 }
255
256 return nil
257}
258
259// handleAccountEvent processes account events (account creation/updates)
260func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error {
261 if event.Account == nil {
262 return fmt.Errorf("account event missing account data")
263 }
264
265 did := event.Account.Did
266 if did == "" {
267 return fmt.Errorf("account event missing did")
268 }
269
270 // Account events don't include handle, so we can't index yet
271 // We'll wait for the corresponding identity event
272 log.Printf("Account event for %s (waiting for identity event)", did)
273 return nil
274}
275
276// isDuplicateError checks if error is due to duplicate DID/handle
277func isDuplicateError(err error) bool {
278 if err == nil {
279 return false
280 }
281 errStr := err.Error()
282 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate")
283}
284
285func contains(s, substr string) bool {
286 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr))
287}
288
289func anySubstring(s, substr string) bool {
290 for i := 0; i <= len(s)-len(substr); i++ {
291 if s[i:i+len(substr)] == substr {
292 return true
293 }
294 }
295 return false
296}