A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "sync" 9 "time" 10 11 "Coves/internal/atproto/identity" 12 "Coves/internal/core/users" 13 14 "github.com/gorilla/websocket" 15) 16 17// JetstreamEvent represents an event from the Jetstream firehose 18// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream 19type JetstreamEvent struct { 20 Account *AccountEvent `json:"account,omitempty"` 21 Identity *IdentityEvent `json:"identity,omitempty"` 22 Commit *CommitEvent `json:"commit,omitempty"` 23 Did string `json:"did"` 24 Kind string `json:"kind"` 25 TimeUS int64 `json:"time_us"` 26} 27 28type AccountEvent struct { 29 Did string `json:"did"` 30 Time string `json:"time"` 31 Seq int64 `json:"seq"` 32 Active bool `json:"active"` 33} 34 35type IdentityEvent struct { 36 Did string `json:"did"` 37 Handle string `json:"handle"` 38 Time string `json:"time"` 39 Seq int64 `json:"seq"` 40} 41 42// CommitEvent represents a record commit from Jetstream 43type CommitEvent struct { 44 Rev string `json:"rev"` 45 Operation string `json:"operation"` // "create", "update", "delete" 46 Collection string `json:"collection"` 47 RKey string `json:"rkey"` 48 Record map[string]interface{} `json:"record,omitempty"` 49 CID string `json:"cid,omitempty"` 50} 51 52// UserEventConsumer consumes user-related events from Jetstream 53type UserEventConsumer struct { 54 userService users.UserService 55 identityResolver identity.Resolver 56 wsURL string 57 pdsFilter string // Optional: only index users from specific PDS 58} 59 60// NewUserEventConsumer creates a new Jetstream consumer for user events 61func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string) *UserEventConsumer { 62 return &UserEventConsumer{ 63 userService: userService, 64 identityResolver: identityResolver, 65 wsURL: wsURL, 66 pdsFilter: pdsFilter, 67 } 68} 69 70// Start begins consuming events from Jetstream 71// Runs indefinitely, reconnecting on errors 72func (c *UserEventConsumer) Start(ctx context.Context) error { 73 log.Printf("Starting Jetstream user consumer: %s", c.wsURL) 74 75 for { 76 select { 77 case <-ctx.Done(): 78 log.Println("Jetstream consumer shutting down") 79 return ctx.Err() 80 default: 81 if err := c.connect(ctx); err != nil { 82 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err) 83 time.Sleep(5 * time.Second) 84 continue 85 } 86 } 87 } 88} 89 90// connect establishes WebSocket connection and processes events 91func (c *UserEventConsumer) connect(ctx context.Context) error { 92 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 93 if err != nil { 94 return fmt.Errorf("failed to connect to Jetstream: %w", err) 95 } 96 defer func() { 97 if err := conn.Close(); err != nil { 98 log.Printf("Failed to close WebSocket connection: %v", err) 99 } 100 }() 101 102 log.Println("Connected to Jetstream") 103 104 // Set read deadline to detect connection issues 105 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 106 log.Printf("Failed to set read deadline: %v", err) 107 } 108 109 // Set pong handler to keep connection alive 110 conn.SetPongHandler(func(string) error { 111 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 112 log.Printf("Failed to set read deadline in pong handler: %v", err) 113 } 114 return nil 115 }) 116 117 // Start ping ticker 118 ticker := time.NewTicker(30 * time.Second) 119 defer ticker.Stop() 120 121 done := make(chan struct{}) 122 var closeOnce sync.Once // Ensure done channel is only closed once 123 124 // Goroutine to send pings 125 go func() { 126 for { 127 select { 128 case <-ticker.C: 129 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 130 log.Printf("Ping error: %v", err) 131 closeOnce.Do(func() { close(done) }) 132 return 133 } 134 case <-done: 135 return 136 case <-ctx.Done(): 137 return 138 } 139 } 140 }() 141 142 // Read messages 143 for { 144 select { 145 case <-ctx.Done(): 146 return ctx.Err() 147 case <-done: 148 return fmt.Errorf("connection closed") 149 default: 150 _, message, err := conn.ReadMessage() 151 if err != nil { 152 closeOnce.Do(func() { close(done) }) 153 return fmt.Errorf("read error: %w", err) 154 } 155 156 // Reset read deadline on successful read 157 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 158 log.Printf("Failed to set read deadline: %v", err) 159 } 160 161 if err := c.handleEvent(ctx, message); err != nil { 162 log.Printf("Error handling event: %v", err) 163 // Continue processing other events 164 } 165 } 166 } 167} 168 169// handleEvent processes a single Jetstream event 170func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error { 171 var event JetstreamEvent 172 if err := json.Unmarshal(data, &event); err != nil { 173 return fmt.Errorf("failed to parse event: %w", err) 174 } 175 176 // We're interested in identity events (handle updates) and account events (new users) 177 switch event.Kind { 178 case "identity": 179 return c.handleIdentityEvent(ctx, &event) 180 case "account": 181 return c.handleAccountEvent(ctx, &event) 182 default: 183 // Ignore other event types (commits, etc.) 184 return nil 185 } 186} 187 188// HandleIdentityEventPublic is a public wrapper for testing 189func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error { 190 return c.handleIdentityEvent(ctx, event) 191} 192 193// handleIdentityEvent processes identity events (handle changes) 194func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 195 if event.Identity == nil { 196 return fmt.Errorf("identity event missing identity data") 197 } 198 199 did := event.Identity.Did 200 handle := event.Identity.Handle 201 202 if did == "" || handle == "" { 203 return fmt.Errorf("identity event missing did or handle") 204 } 205 206 log.Printf("Identity event: %s → %s", did, handle) 207 208 // Get existing user to check if handle changed 209 existingUser, err := c.userService.GetUserByDID(ctx, did) 210 if err != nil { 211 // User doesn't exist - create new user 212 pdsURL := "https://bsky.social" // Default Bluesky PDS 213 // TODO: Resolve PDS URL from DID document via PLC directory 214 215 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{ 216 DID: did, 217 Handle: handle, 218 PDSURL: pdsURL, 219 }) 220 221 if createErr != nil && !isDuplicateError(createErr) { 222 return fmt.Errorf("failed to create user: %w", createErr) 223 } 224 225 log.Printf("Indexed new user: %s (%s)", handle, did) 226 return nil 227 } 228 229 // User exists - check if handle changed 230 if existingUser.Handle != handle { 231 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did) 232 233 // CRITICAL: Update database FIRST, then purge cache 234 // This prevents race condition where cache gets refilled with stale data 235 _, updateErr := c.userService.UpdateHandle(ctx, did, handle) 236 if updateErr != nil { 237 return fmt.Errorf("failed to update handle: %w", updateErr) 238 } 239 240 // CRITICAL: Purge BOTH old handle and DID from cache 241 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed) 242 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil { 243 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr) 244 } 245 246 // DID: did:plc:abc123 → alice.bsky.social (must be removed) 247 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil { 248 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr) 249 } 250 251 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle) 252 } else { 253 log.Printf("Handle unchanged for %s (%s)", handle, did) 254 } 255 256 return nil 257} 258 259// handleAccountEvent processes account events (account creation/updates) 260func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error { 261 if event.Account == nil { 262 return fmt.Errorf("account event missing account data") 263 } 264 265 did := event.Account.Did 266 if did == "" { 267 return fmt.Errorf("account event missing did") 268 } 269 270 // Account events don't include handle, so we can't index yet 271 // We'll wait for the corresponding identity event 272 log.Printf("Account event for %s (waiting for identity event)", did) 273 return nil 274} 275 276// isDuplicateError checks if error is due to duplicate DID/handle 277func isDuplicateError(err error) bool { 278 if err == nil { 279 return false 280 } 281 errStr := err.Error() 282 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate") 283} 284 285func contains(s, substr string) bool { 286 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr)) 287} 288 289func anySubstring(s, substr string) bool { 290 for i := 0; i <= len(s)-len(substr); i++ { 291 if s[i:i+len(substr)] == substr { 292 return true 293 } 294 } 295 return false 296}