A community based topic aggregation platform built on atproto
1package jetstream
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "Coves/internal/core/users"
11 "github.com/gorilla/websocket"
12)
13
14// JetstreamEvent represents an event from the Jetstream firehose
15// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream
16type JetstreamEvent struct {
17 Did string `json:"did"`
18 TimeUS int64 `json:"time_us"`
19 Kind string `json:"kind"` // "account", "commit", "identity"
20 Account *AccountEvent `json:"account,omitempty"`
21 Identity *IdentityEvent `json:"identity,omitempty"`
22}
23
24type AccountEvent struct {
25 Active bool `json:"active"`
26 Did string `json:"did"`
27 Seq int64 `json:"seq"`
28 Time string `json:"time"`
29}
30
31type IdentityEvent struct {
32 Did string `json:"did"`
33 Handle string `json:"handle"`
34 Seq int64 `json:"seq"`
35 Time string `json:"time"`
36}
37
38// UserEventConsumer consumes user-related events from Jetstream
39type UserEventConsumer struct {
40 userService users.UserService
41 wsURL string
42 pdsFilter string // Optional: only index users from specific PDS
43}
44
45// NewUserEventConsumer creates a new Jetstream consumer for user events
46func NewUserEventConsumer(userService users.UserService, wsURL string, pdsFilter string) *UserEventConsumer {
47 return &UserEventConsumer{
48 userService: userService,
49 wsURL: wsURL,
50 pdsFilter: pdsFilter,
51 }
52}
53
54// Start begins consuming events from Jetstream
55// Runs indefinitely, reconnecting on errors
56func (c *UserEventConsumer) Start(ctx context.Context) error {
57 log.Printf("Starting Jetstream user consumer: %s", c.wsURL)
58
59 for {
60 select {
61 case <-ctx.Done():
62 log.Println("Jetstream consumer shutting down")
63 return ctx.Err()
64 default:
65 if err := c.connect(ctx); err != nil {
66 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err)
67 time.Sleep(5 * time.Second)
68 continue
69 }
70 }
71 }
72}
73
74// connect establishes WebSocket connection and processes events
75func (c *UserEventConsumer) connect(ctx context.Context) error {
76 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil)
77 if err != nil {
78 return fmt.Errorf("failed to connect to Jetstream: %w", err)
79 }
80 defer conn.Close()
81
82 log.Println("Connected to Jetstream")
83
84 // Set read deadline to detect connection issues
85 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
86
87 // Set pong handler to keep connection alive
88 conn.SetPongHandler(func(string) error {
89 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
90 return nil
91 })
92
93 // Start ping ticker
94 ticker := time.NewTicker(30 * time.Second)
95 defer ticker.Stop()
96
97 done := make(chan struct{})
98
99 // Goroutine to send pings
100 // TODO: Fix race condition - multiple goroutines can call close(done) concurrently
101 // Use sync.Once to ensure close(done) is called exactly once
102 // See PR review issue #4
103 go func() {
104 for {
105 select {
106 case <-ticker.C:
107 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
108 log.Printf("Ping error: %v", err)
109 close(done)
110 return
111 }
112 case <-done:
113 return
114 case <-ctx.Done():
115 return
116 }
117 }
118 }()
119
120 // Read messages
121 for {
122 select {
123 case <-ctx.Done():
124 return ctx.Err()
125 case <-done:
126 return fmt.Errorf("connection closed")
127 default:
128 _, message, err := conn.ReadMessage()
129 if err != nil {
130 close(done)
131 return fmt.Errorf("read error: %w", err)
132 }
133
134 // Reset read deadline on successful read
135 conn.SetReadDeadline(time.Now().Add(60 * time.Second))
136
137 if err := c.handleEvent(ctx, message); err != nil {
138 log.Printf("Error handling event: %v", err)
139 // Continue processing other events
140 }
141 }
142 }
143}
144
145// handleEvent processes a single Jetstream event
146func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error {
147 var event JetstreamEvent
148 if err := json.Unmarshal(data, &event); err != nil {
149 return fmt.Errorf("failed to parse event: %w", err)
150 }
151
152 // We're interested in identity events (handle updates) and account events (new users)
153 switch event.Kind {
154 case "identity":
155 return c.handleIdentityEvent(ctx, &event)
156 case "account":
157 return c.handleAccountEvent(ctx, &event)
158 default:
159 // Ignore other event types (commits, etc.)
160 return nil
161 }
162}
163
164// HandleIdentityEventPublic is a public wrapper for testing
165func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error {
166 return c.handleIdentityEvent(ctx, event)
167}
168
169// handleIdentityEvent processes identity events (handle changes)
170func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error {
171 if event.Identity == nil {
172 return fmt.Errorf("identity event missing identity data")
173 }
174
175 did := event.Identity.Did
176 handle := event.Identity.Handle
177
178 if did == "" || handle == "" {
179 return fmt.Errorf("identity event missing did or handle")
180 }
181
182 log.Printf("Identity event: %s → %s", did, handle)
183
184 // For now, we'll create/update user on identity events
185 // In a full implementation, you'd want to:
186 // 1. Check if user exists
187 // 2. Update handle if changed
188 // 3. Resolve PDS URL from DID document
189
190 // Simplified: just try to create user (will be idempotent)
191 // We need PDS URL - for now use a placeholder
192 // TODO: Implement DID→PDS resolution via PLC directory (https://plc.directory/{did})
193 // For production federation support, resolve PDS endpoint from DID document
194 // For local dev, this works fine since we filter to our own PDS
195 // See PR review issue #2
196 pdsURL := "https://bsky.social" // Default Bluesky PDS
197
198 _, err := c.userService.CreateUser(ctx, users.CreateUserRequest{
199 DID: did,
200 Handle: handle,
201 PDSURL: pdsURL,
202 })
203
204 if err != nil {
205 // Check if it's a duplicate error (expected for idempotency)
206 if isDuplicateError(err) {
207 log.Printf("User already indexed: %s (%s)", handle, did)
208 return nil
209 }
210 return fmt.Errorf("failed to create user: %w", err)
211 }
212
213 log.Printf("Indexed new user: %s (%s)", handle, did)
214 return nil
215}
216
217// handleAccountEvent processes account events (account creation/updates)
218func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error {
219 if event.Account == nil {
220 return fmt.Errorf("account event missing account data")
221 }
222
223 did := event.Account.Did
224 if did == "" {
225 return fmt.Errorf("account event missing did")
226 }
227
228 // Account events don't include handle, so we can't index yet
229 // We'll wait for the corresponding identity event
230 log.Printf("Account event for %s (waiting for identity event)", did)
231 return nil
232}
233
234// isDuplicateError checks if error is due to duplicate DID/handle
235func isDuplicateError(err error) bool {
236 if err == nil {
237 return false
238 }
239 errStr := err.Error()
240 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate")
241}
242
243func contains(s, substr string) bool {
244 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr))
245}
246
247func anySubstring(s, substr string) bool {
248 for i := 0; i <= len(s)-len(substr); i++ {
249 if s[i:i+len(substr)] == substr {
250 return true
251 }
252 }
253 return false
254}