A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "Coves/internal/atproto/identity" 11 "Coves/internal/core/users" 12 "github.com/gorilla/websocket" 13) 14 15// JetstreamEvent represents an event from the Jetstream firehose 16// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream 17type JetstreamEvent struct { 18 Did string `json:"did"` 19 TimeUS int64 `json:"time_us"` 20 Kind string `json:"kind"` // "account", "commit", "identity" 21 Account *AccountEvent `json:"account,omitempty"` 22 Identity *IdentityEvent `json:"identity,omitempty"` 23 Commit *CommitEvent `json:"commit,omitempty"` 24} 25 26type AccountEvent struct { 27 Active bool `json:"active"` 28 Did string `json:"did"` 29 Seq int64 `json:"seq"` 30 Time string `json:"time"` 31} 32 33type IdentityEvent struct { 34 Did string `json:"did"` 35 Handle string `json:"handle"` 36 Seq int64 `json:"seq"` 37 Time string `json:"time"` 38} 39 40// CommitEvent represents a record commit from Jetstream 41type CommitEvent struct { 42 Rev string `json:"rev"` 43 Operation string `json:"operation"` // "create", "update", "delete" 44 Collection string `json:"collection"` 45 RKey string `json:"rkey"` 46 Record map[string]interface{} `json:"record,omitempty"` 47 CID string `json:"cid,omitempty"` 48} 49 50// UserEventConsumer consumes user-related events from Jetstream 51type UserEventConsumer struct { 52 userService users.UserService 53 identityResolver identity.Resolver 54 wsURL string 55 pdsFilter string // Optional: only index users from specific PDS 56} 57 58// NewUserEventConsumer creates a new Jetstream consumer for user events 59func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL string, pdsFilter string) *UserEventConsumer { 60 return &UserEventConsumer{ 61 userService: userService, 62 identityResolver: identityResolver, 63 wsURL: wsURL, 64 pdsFilter: pdsFilter, 65 } 66} 67 68// Start begins consuming events from Jetstream 69// Runs indefinitely, reconnecting on errors 70func (c *UserEventConsumer) Start(ctx context.Context) error { 71 log.Printf("Starting Jetstream user consumer: %s", c.wsURL) 72 73 for { 74 select { 75 case <-ctx.Done(): 76 log.Println("Jetstream consumer shutting down") 77 return ctx.Err() 78 default: 79 if err := c.connect(ctx); err != nil { 80 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err) 81 time.Sleep(5 * time.Second) 82 continue 83 } 84 } 85 } 86} 87 88// connect establishes WebSocket connection and processes events 89func (c *UserEventConsumer) connect(ctx context.Context) error { 90 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 91 if err != nil { 92 return fmt.Errorf("failed to connect to Jetstream: %w", err) 93 } 94 defer conn.Close() 95 96 log.Println("Connected to Jetstream") 97 98 // Set read deadline to detect connection issues 99 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 100 101 // Set pong handler to keep connection alive 102 conn.SetPongHandler(func(string) error { 103 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 104 return nil 105 }) 106 107 // Start ping ticker 108 ticker := time.NewTicker(30 * time.Second) 109 defer ticker.Stop() 110 111 done := make(chan struct{}) 112 113 // Goroutine to send pings 114 // TODO: Fix race condition - multiple goroutines can call close(done) concurrently 115 // Use sync.Once to ensure close(done) is called exactly once 116 // See PR review issue #4 117 go func() { 118 for { 119 select { 120 case <-ticker.C: 121 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 122 log.Printf("Ping error: %v", err) 123 close(done) 124 return 125 } 126 case <-done: 127 return 128 case <-ctx.Done(): 129 return 130 } 131 } 132 }() 133 134 // Read messages 135 for { 136 select { 137 case <-ctx.Done(): 138 return ctx.Err() 139 case <-done: 140 return fmt.Errorf("connection closed") 141 default: 142 _, message, err := conn.ReadMessage() 143 if err != nil { 144 close(done) 145 return fmt.Errorf("read error: %w", err) 146 } 147 148 // Reset read deadline on successful read 149 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 150 151 if err := c.handleEvent(ctx, message); err != nil { 152 log.Printf("Error handling event: %v", err) 153 // Continue processing other events 154 } 155 } 156 } 157} 158 159// handleEvent processes a single Jetstream event 160func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error { 161 var event JetstreamEvent 162 if err := json.Unmarshal(data, &event); err != nil { 163 return fmt.Errorf("failed to parse event: %w", err) 164 } 165 166 // We're interested in identity events (handle updates) and account events (new users) 167 switch event.Kind { 168 case "identity": 169 return c.handleIdentityEvent(ctx, &event) 170 case "account": 171 return c.handleAccountEvent(ctx, &event) 172 default: 173 // Ignore other event types (commits, etc.) 174 return nil 175 } 176} 177 178// HandleIdentityEventPublic is a public wrapper for testing 179func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error { 180 return c.handleIdentityEvent(ctx, event) 181} 182 183// handleIdentityEvent processes identity events (handle changes) 184func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 185 if event.Identity == nil { 186 return fmt.Errorf("identity event missing identity data") 187 } 188 189 did := event.Identity.Did 190 handle := event.Identity.Handle 191 192 if did == "" || handle == "" { 193 return fmt.Errorf("identity event missing did or handle") 194 } 195 196 log.Printf("Identity event: %s → %s", did, handle) 197 198 // Get existing user to check if handle changed 199 existingUser, err := c.userService.GetUserByDID(ctx, did) 200 if err != nil { 201 // User doesn't exist - create new user 202 pdsURL := "https://bsky.social" // Default Bluesky PDS 203 // TODO: Resolve PDS URL from DID document via PLC directory 204 205 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{ 206 DID: did, 207 Handle: handle, 208 PDSURL: pdsURL, 209 }) 210 211 if createErr != nil && !isDuplicateError(createErr) { 212 return fmt.Errorf("failed to create user: %w", createErr) 213 } 214 215 log.Printf("Indexed new user: %s (%s)", handle, did) 216 return nil 217 } 218 219 // User exists - check if handle changed 220 if existingUser.Handle != handle { 221 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did) 222 223 // CRITICAL: Update database FIRST, then purge cache 224 // This prevents race condition where cache gets refilled with stale data 225 _, updateErr := c.userService.UpdateHandle(ctx, did, handle) 226 if updateErr != nil { 227 return fmt.Errorf("failed to update handle: %w", updateErr) 228 } 229 230 // CRITICAL: Purge BOTH old handle and DID from cache 231 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed) 232 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil { 233 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr) 234 } 235 236 // DID: did:plc:abc123 → alice.bsky.social (must be removed) 237 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil { 238 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr) 239 } 240 241 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle) 242 } else { 243 log.Printf("Handle unchanged for %s (%s)", handle, did) 244 } 245 246 return nil 247} 248 249// handleAccountEvent processes account events (account creation/updates) 250func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error { 251 if event.Account == nil { 252 return fmt.Errorf("account event missing account data") 253 } 254 255 did := event.Account.Did 256 if did == "" { 257 return fmt.Errorf("account event missing did") 258 } 259 260 // Account events don't include handle, so we can't index yet 261 // We'll wait for the corresponding identity event 262 log.Printf("Account event for %s (waiting for identity event)", did) 263 return nil 264} 265 266// isDuplicateError checks if error is due to duplicate DID/handle 267func isDuplicateError(err error) bool { 268 if err == nil { 269 return false 270 } 271 errStr := err.Error() 272 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate") 273} 274 275func contains(s, substr string) bool { 276 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr)) 277} 278 279func anySubstring(s, substr string) bool { 280 for i := 0; i <= len(s)-len(substr); i++ { 281 if s[i:i+len(substr)] == substr { 282 return true 283 } 284 } 285 return false 286}