A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "Coves/internal/atproto/identity"
5 "Coves/internal/core/users"
6 "context"
7 "encoding/json"
8 "fmt"
9 "log"
10 "sync"
11 "time"
12
13 "github.com/gorilla/websocket"
14)
15
16// JetstreamEvent represents an event from the Jetstream firehose
17// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream
18type JetstreamEvent struct {
19 Account *AccountEvent `json:"account,omitempty"`
20 Identity *IdentityEvent `json:"identity,omitempty"`
21 Commit *CommitEvent `json:"commit,omitempty"`
22 Did string `json:"did"`
23 Kind string `json:"kind"`
24 TimeUS int64 `json:"time_us"`
25}
26
27type AccountEvent struct {
28 Did string `json:"did"`
29 Time string `json:"time"`
30 Seq int64 `json:"seq"`
31 Active bool `json:"active"`
32}
33
34type IdentityEvent struct {
35 Did string `json:"did"`
36 Handle string `json:"handle"`
37 Time string `json:"time"`
38 Seq int64 `json:"seq"`
39}
40
41// CommitEvent represents a record commit from Jetstream
42type CommitEvent struct {
43 Rev string `json:"rev"`
44 Operation string `json:"operation"` // "create", "update", "delete"
45 Collection string `json:"collection"`
46 RKey string `json:"rkey"`
47 Record map[string]interface{} `json:"record,omitempty"`
48 CID string `json:"cid,omitempty"`
49}
50
51// UserEventConsumer consumes user-related events from Jetstream
52type UserEventConsumer struct {
53 userService users.UserService
54 identityResolver identity.Resolver
55 wsURL string
56 pdsFilter string // Optional: only index users from specific PDS
57}
58
59// NewUserEventConsumer creates a new Jetstream consumer for user events
60func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string) *UserEventConsumer {
61 return &UserEventConsumer{
62 userService: userService,
63 identityResolver: identityResolver,
64 wsURL: wsURL,
65 pdsFilter: pdsFilter,
66 }
67}
68
69// Start begins consuming events from Jetstream
70// Runs indefinitely, reconnecting on errors
71func (c *UserEventConsumer) Start(ctx context.Context) error {
72 log.Printf("Starting Jetstream user consumer: %s", c.wsURL)
73
74 for {
75 select {
76 case <-ctx.Done():
77 log.Println("Jetstream consumer shutting down")
78 return ctx.Err()
79 default:
80 if err := c.connect(ctx); err != nil {
81 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err)
82 time.Sleep(5 * time.Second)
83 continue
84 }
85 }
86 }
87}
88
89// connect establishes WebSocket connection and processes events
90func (c *UserEventConsumer) connect(ctx context.Context) error {
91 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
92 if err != nil {
93 return fmt.Errorf("failed to connect to Jetstream: %w", err)
94 }
95 defer func() {
96 if err := conn.Close(); err != nil {
97 log.Printf("Failed to close WebSocket connection: %v", err)
98 }
99 }()
100
101 log.Println("Connected to Jetstream")
102
103 // Set read deadline to detect connection issues
104 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
105 log.Printf("Failed to set read deadline: %v", err)
106 }
107
108 // Set pong handler to keep connection alive
109 conn.SetPongHandler(func(string) error {
110 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
111 log.Printf("Failed to set read deadline in pong handler: %v", err)
112 }
113 return nil
114 })
115
116 // Start ping ticker
117 ticker := time.NewTicker(30 * time.Second)
118 defer ticker.Stop()
119
120 done := make(chan struct{})
121 var closeOnce sync.Once // Ensure done channel is only closed once
122
123 // Goroutine to send pings
124 go func() {
125 for {
126 select {
127 case <-ticker.C:
128 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
129 log.Printf("Ping error: %v", err)
130 closeOnce.Do(func() { close(done) })
131 return
132 }
133 case <-done:
134 return
135 case <-ctx.Done():
136 return
137 }
138 }
139 }()
140
141 // Read messages
142 for {
143 select {
144 case <-ctx.Done():
145 return ctx.Err()
146 case <-done:
147 return fmt.Errorf("connection closed")
148 default:
149 _, message, err := conn.ReadMessage()
150 if err != nil {
151 closeOnce.Do(func() { close(done) })
152 return fmt.Errorf("read error: %w", err)
153 }
154
155 // Reset read deadline on successful read
156 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil {
157 log.Printf("Failed to set read deadline: %v", err)
158 }
159
160 if err := c.handleEvent(ctx, message); err != nil {
161 log.Printf("Error handling event: %v", err)
162 // Continue processing other events
163 }
164 }
165 }
166}
167
168// handleEvent processes a single Jetstream event
169func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error {
170 var event JetstreamEvent
171 if err := json.Unmarshal(data, &event); err != nil {
172 return fmt.Errorf("failed to parse event: %w", err)
173 }
174
175 // We're interested in identity events (handle updates) and account events (new users)
176 switch event.Kind {
177 case "identity":
178 return c.handleIdentityEvent(ctx, &event)
179 case "account":
180 return c.handleAccountEvent(ctx, &event)
181 default:
182 // Ignore other event types (commits, etc.)
183 return nil
184 }
185}
186
187// HandleIdentityEventPublic is a public wrapper for testing
188func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error {
189 return c.handleIdentityEvent(ctx, event)
190}
191
192// handleIdentityEvent processes identity events (handle changes)
193func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error {
194 if event.Identity == nil {
195 return fmt.Errorf("identity event missing identity data")
196 }
197
198 did := event.Identity.Did
199 handle := event.Identity.Handle
200
201 if did == "" || handle == "" {
202 return fmt.Errorf("identity event missing did or handle")
203 }
204
205 log.Printf("Identity event: %s → %s", did, handle)
206
207 // Get existing user to check if handle changed
208 existingUser, err := c.userService.GetUserByDID(ctx, did)
209 if err != nil {
210 // User doesn't exist - create new user
211 pdsURL := "https://bsky.social" // Default Bluesky PDS
212 // TODO: Resolve PDS URL from DID document via PLC directory
213
214 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{
215 DID: did,
216 Handle: handle,
217 PDSURL: pdsURL,
218 })
219
220 if createErr != nil && !isDuplicateError(createErr) {
221 return fmt.Errorf("failed to create user: %w", createErr)
222 }
223
224 log.Printf("Indexed new user: %s (%s)", handle, did)
225 return nil
226 }
227
228 // User exists - check if handle changed
229 if existingUser.Handle != handle {
230 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did)
231
232 // CRITICAL: Update database FIRST, then purge cache
233 // This prevents race condition where cache gets refilled with stale data
234 _, updateErr := c.userService.UpdateHandle(ctx, did, handle)
235 if updateErr != nil {
236 return fmt.Errorf("failed to update handle: %w", updateErr)
237 }
238
239 // CRITICAL: Purge BOTH old handle and DID from cache
240 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed)
241 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil {
242 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr)
243 }
244
245 // DID: did:plc:abc123 → alice.bsky.social (must be removed)
246 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil {
247 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr)
248 }
249
250 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle)
251 } else {
252 log.Printf("Handle unchanged for %s (%s)", handle, did)
253 }
254
255 return nil
256}
257
258// handleAccountEvent processes account events (account creation/updates)
259func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error {
260 if event.Account == nil {
261 return fmt.Errorf("account event missing account data")
262 }
263
264 did := event.Account.Did
265 if did == "" {
266 return fmt.Errorf("account event missing did")
267 }
268
269 // Account events don't include handle, so we can't index yet
270 // We'll wait for the corresponding identity event
271 log.Printf("Account event for %s (waiting for identity event)", did)
272 return nil
273}
274
275// isDuplicateError checks if error is due to duplicate DID/handle
276func isDuplicateError(err error) bool {
277 if err == nil {
278 return false
279 }
280 errStr := err.Error()
281 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate")
282}
283
284func contains(s, substr string) bool {
285 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr))
286}
287
288func anySubstring(s, substr string) bool {
289 for i := 0; i <= len(s)-len(substr); i++ {
290 if s[i:i+len(substr)] == substr {
291 return true
292 }
293 }
294 return false
295}