A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "Coves/internal/atproto/identity" 11 "Coves/internal/core/users" 12 "github.com/gorilla/websocket" 13) 14 15// JetstreamEvent represents an event from the Jetstream firehose 16// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream 17type JetstreamEvent struct { 18 Did string `json:"did"` 19 TimeUS int64 `json:"time_us"` 20 Kind string `json:"kind"` // "account", "commit", "identity" 21 Account *AccountEvent `json:"account,omitempty"` 22 Identity *IdentityEvent `json:"identity,omitempty"` 23} 24 25type AccountEvent struct { 26 Active bool `json:"active"` 27 Did string `json:"did"` 28 Seq int64 `json:"seq"` 29 Time string `json:"time"` 30} 31 32type IdentityEvent struct { 33 Did string `json:"did"` 34 Handle string `json:"handle"` 35 Seq int64 `json:"seq"` 36 Time string `json:"time"` 37} 38 39// UserEventConsumer consumes user-related events from Jetstream 40type UserEventConsumer struct { 41 userService users.UserService 42 identityResolver identity.Resolver 43 wsURL string 44 pdsFilter string // Optional: only index users from specific PDS 45} 46 47// NewUserEventConsumer creates a new Jetstream consumer for user events 48func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL string, pdsFilter string) *UserEventConsumer { 49 return &UserEventConsumer{ 50 userService: userService, 51 identityResolver: identityResolver, 52 wsURL: wsURL, 53 pdsFilter: pdsFilter, 54 } 55} 56 57// Start begins consuming events from Jetstream 58// Runs indefinitely, reconnecting on errors 59func (c *UserEventConsumer) Start(ctx context.Context) error { 60 log.Printf("Starting Jetstream user consumer: %s", c.wsURL) 61 62 for { 63 select { 64 case <-ctx.Done(): 65 log.Println("Jetstream consumer shutting down") 66 return ctx.Err() 67 default: 68 if err := c.connect(ctx); err != nil { 69 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err) 70 time.Sleep(5 * time.Second) 71 continue 72 } 73 } 74 } 75} 76 77// connect establishes WebSocket connection and processes events 78func (c *UserEventConsumer) connect(ctx context.Context) error { 79 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 80 if err != nil { 81 return fmt.Errorf("failed to connect to Jetstream: %w", err) 82 } 83 defer conn.Close() 84 85 log.Println("Connected to Jetstream") 86 87 // Set read deadline to detect connection issues 88 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 89 90 // Set pong handler to keep connection alive 91 conn.SetPongHandler(func(string) error { 92 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 93 return nil 94 }) 95 96 // Start ping ticker 97 ticker := time.NewTicker(30 * time.Second) 98 defer ticker.Stop() 99 100 done := make(chan struct{}) 101 102 // Goroutine to send pings 103 // TODO: Fix race condition - multiple goroutines can call close(done) concurrently 104 // Use sync.Once to ensure close(done) is called exactly once 105 // See PR review issue #4 106 go func() { 107 for { 108 select { 109 case <-ticker.C: 110 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 111 log.Printf("Ping error: %v", err) 112 close(done) 113 return 114 } 115 case <-done: 116 return 117 case <-ctx.Done(): 118 return 119 } 120 } 121 }() 122 123 // Read messages 124 for { 125 select { 126 case <-ctx.Done(): 127 return ctx.Err() 128 case <-done: 129 return fmt.Errorf("connection closed") 130 default: 131 _, message, err := conn.ReadMessage() 132 if err != nil { 133 close(done) 134 return fmt.Errorf("read error: %w", err) 135 } 136 137 // Reset read deadline on successful read 138 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 139 140 if err := c.handleEvent(ctx, message); err != nil { 141 log.Printf("Error handling event: %v", err) 142 // Continue processing other events 143 } 144 } 145 } 146} 147 148// handleEvent processes a single Jetstream event 149func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error { 150 var event JetstreamEvent 151 if err := json.Unmarshal(data, &event); err != nil { 152 return fmt.Errorf("failed to parse event: %w", err) 153 } 154 155 // We're interested in identity events (handle updates) and account events (new users) 156 switch event.Kind { 157 case "identity": 158 return c.handleIdentityEvent(ctx, &event) 159 case "account": 160 return c.handleAccountEvent(ctx, &event) 161 default: 162 // Ignore other event types (commits, etc.) 163 return nil 164 } 165} 166 167// HandleIdentityEventPublic is a public wrapper for testing 168func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error { 169 return c.handleIdentityEvent(ctx, event) 170} 171 172// handleIdentityEvent processes identity events (handle changes) 173func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 174 if event.Identity == nil { 175 return fmt.Errorf("identity event missing identity data") 176 } 177 178 did := event.Identity.Did 179 handle := event.Identity.Handle 180 181 if did == "" || handle == "" { 182 return fmt.Errorf("identity event missing did or handle") 183 } 184 185 log.Printf("Identity event: %s → %s", did, handle) 186 187 // Get existing user to check if handle changed 188 existingUser, err := c.userService.GetUserByDID(ctx, did) 189 if err != nil { 190 // User doesn't exist - create new user 191 pdsURL := "https://bsky.social" // Default Bluesky PDS 192 // TODO: Resolve PDS URL from DID document via PLC directory 193 194 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{ 195 DID: did, 196 Handle: handle, 197 PDSURL: pdsURL, 198 }) 199 200 if createErr != nil && !isDuplicateError(createErr) { 201 return fmt.Errorf("failed to create user: %w", createErr) 202 } 203 204 log.Printf("Indexed new user: %s (%s)", handle, did) 205 return nil 206 } 207 208 // User exists - check if handle changed 209 if existingUser.Handle != handle { 210 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did) 211 212 // CRITICAL: Update database FIRST, then purge cache 213 // This prevents race condition where cache gets refilled with stale data 214 _, updateErr := c.userService.UpdateHandle(ctx, did, handle) 215 if updateErr != nil { 216 return fmt.Errorf("failed to update handle: %w", updateErr) 217 } 218 219 // CRITICAL: Purge BOTH old handle and DID from cache 220 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed) 221 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil { 222 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr) 223 } 224 225 // DID: did:plc:abc123 → alice.bsky.social (must be removed) 226 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil { 227 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr) 228 } 229 230 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle) 231 } else { 232 log.Printf("Handle unchanged for %s (%s)", handle, did) 233 } 234 235 return nil 236} 237 238// handleAccountEvent processes account events (account creation/updates) 239func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error { 240 if event.Account == nil { 241 return fmt.Errorf("account event missing account data") 242 } 243 244 did := event.Account.Did 245 if did == "" { 246 return fmt.Errorf("account event missing did") 247 } 248 249 // Account events don't include handle, so we can't index yet 250 // We'll wait for the corresponding identity event 251 log.Printf("Account event for %s (waiting for identity event)", did) 252 return nil 253} 254 255// isDuplicateError checks if error is due to duplicate DID/handle 256func isDuplicateError(err error) bool { 257 if err == nil { 258 return false 259 } 260 errStr := err.Error() 261 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate") 262} 263 264func contains(s, substr string) bool { 265 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr)) 266} 267 268func anySubstring(s, substr string) bool { 269 for i := 0; i <= len(s)-len(substr); i++ { 270 if s[i:i+len(substr)] == substr { 271 return true 272 } 273 } 274 return false 275}