A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "Coves/internal/atproto/identity" 5 "Coves/internal/core/users" 6 "context" 7 "encoding/json" 8 "fmt" 9 "log" 10 "sync" 11 "time" 12 13 "github.com/gorilla/websocket" 14) 15 16// JetstreamEvent represents an event from the Jetstream firehose 17// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream 18type JetstreamEvent struct { 19 Account *AccountEvent `json:"account,omitempty"` 20 Identity *IdentityEvent `json:"identity,omitempty"` 21 Commit *CommitEvent `json:"commit,omitempty"` 22 Did string `json:"did"` 23 Kind string `json:"kind"` 24 TimeUS int64 `json:"time_us"` 25} 26 27type AccountEvent struct { 28 Did string `json:"did"` 29 Time string `json:"time"` 30 Seq int64 `json:"seq"` 31 Active bool `json:"active"` 32} 33 34type IdentityEvent struct { 35 Did string `json:"did"` 36 Handle string `json:"handle"` 37 Time string `json:"time"` 38 Seq int64 `json:"seq"` 39} 40 41// CommitEvent represents a record commit from Jetstream 42type CommitEvent struct { 43 Rev string `json:"rev"` 44 Operation string `json:"operation"` // "create", "update", "delete" 45 Collection string `json:"collection"` 46 RKey string `json:"rkey"` 47 Record map[string]interface{} `json:"record,omitempty"` 48 CID string `json:"cid,omitempty"` 49} 50 51// UserEventConsumer consumes user-related events from Jetstream 52type UserEventConsumer struct { 53 userService users.UserService 54 identityResolver identity.Resolver 55 wsURL string 56 pdsFilter string // Optional: only index users from specific PDS 57} 58 59// NewUserEventConsumer creates a new Jetstream consumer for user events 60func NewUserEventConsumer(userService users.UserService, identityResolver identity.Resolver, wsURL, pdsFilter string) *UserEventConsumer { 61 return &UserEventConsumer{ 62 userService: userService, 63 identityResolver: identityResolver, 64 wsURL: wsURL, 65 pdsFilter: pdsFilter, 66 } 67} 68 69// Start begins consuming events from Jetstream 70// Runs indefinitely, reconnecting on errors 71func (c *UserEventConsumer) Start(ctx context.Context) error { 72 log.Printf("Starting Jetstream user consumer: %s", c.wsURL) 73 74 for { 75 select { 76 case <-ctx.Done(): 77 log.Println("Jetstream consumer shutting down") 78 return ctx.Err() 79 default: 80 if err := c.connect(ctx); err != nil { 81 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err) 82 time.Sleep(5 * time.Second) 83 continue 84 } 85 } 86 } 87} 88 89// connect establishes WebSocket connection and processes events 90func (c *UserEventConsumer) connect(ctx context.Context) error { 91 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 92 if err != nil { 93 return fmt.Errorf("failed to connect to Jetstream: %w", err) 94 } 95 defer func() { 96 if err := conn.Close(); err != nil { 97 log.Printf("Failed to close WebSocket connection: %v", err) 98 } 99 }() 100 101 log.Println("Connected to Jetstream") 102 103 // Set read deadline to detect connection issues 104 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 105 log.Printf("Failed to set read deadline: %v", err) 106 } 107 108 // Set pong handler to keep connection alive 109 conn.SetPongHandler(func(string) error { 110 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 111 log.Printf("Failed to set read deadline in pong handler: %v", err) 112 } 113 return nil 114 }) 115 116 // Start ping ticker 117 ticker := time.NewTicker(30 * time.Second) 118 defer ticker.Stop() 119 120 done := make(chan struct{}) 121 var closeOnce sync.Once // Ensure done channel is only closed once 122 123 // Goroutine to send pings 124 go func() { 125 for { 126 select { 127 case <-ticker.C: 128 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 129 log.Printf("Ping error: %v", err) 130 closeOnce.Do(func() { close(done) }) 131 return 132 } 133 case <-done: 134 return 135 case <-ctx.Done(): 136 return 137 } 138 } 139 }() 140 141 // Read messages 142 for { 143 select { 144 case <-ctx.Done(): 145 return ctx.Err() 146 case <-done: 147 return fmt.Errorf("connection closed") 148 default: 149 _, message, err := conn.ReadMessage() 150 if err != nil { 151 closeOnce.Do(func() { close(done) }) 152 return fmt.Errorf("read error: %w", err) 153 } 154 155 // Reset read deadline on successful read 156 if err := conn.SetReadDeadline(time.Now().Add(60 * time.Second)); err != nil { 157 log.Printf("Failed to set read deadline: %v", err) 158 } 159 160 if err := c.handleEvent(ctx, message); err != nil { 161 log.Printf("Error handling event: %v", err) 162 // Continue processing other events 163 } 164 } 165 } 166} 167 168// handleEvent processes a single Jetstream event 169func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error { 170 var event JetstreamEvent 171 if err := json.Unmarshal(data, &event); err != nil { 172 return fmt.Errorf("failed to parse event: %w", err) 173 } 174 175 // We're interested in identity events (handle updates) and account events (new users) 176 switch event.Kind { 177 case "identity": 178 return c.handleIdentityEvent(ctx, &event) 179 case "account": 180 return c.handleAccountEvent(ctx, &event) 181 default: 182 // Ignore other event types (commits, etc.) 183 return nil 184 } 185} 186 187// HandleIdentityEventPublic is a public wrapper for testing 188func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error { 189 return c.handleIdentityEvent(ctx, event) 190} 191 192// handleIdentityEvent processes identity events (handle changes) 193func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 194 if event.Identity == nil { 195 return fmt.Errorf("identity event missing identity data") 196 } 197 198 did := event.Identity.Did 199 handle := event.Identity.Handle 200 201 if did == "" || handle == "" { 202 return fmt.Errorf("identity event missing did or handle") 203 } 204 205 log.Printf("Identity event: %s → %s", did, handle) 206 207 // Get existing user to check if handle changed 208 existingUser, err := c.userService.GetUserByDID(ctx, did) 209 if err != nil { 210 // User doesn't exist - create new user 211 pdsURL := "https://bsky.social" // Default Bluesky PDS 212 // TODO: Resolve PDS URL from DID document via PLC directory 213 214 _, createErr := c.userService.CreateUser(ctx, users.CreateUserRequest{ 215 DID: did, 216 Handle: handle, 217 PDSURL: pdsURL, 218 }) 219 220 if createErr != nil && !isDuplicateError(createErr) { 221 return fmt.Errorf("failed to create user: %w", createErr) 222 } 223 224 log.Printf("Indexed new user: %s (%s)", handle, did) 225 return nil 226 } 227 228 // User exists - check if handle changed 229 if existingUser.Handle != handle { 230 log.Printf("Handle changed: %s → %s (DID: %s)", existingUser.Handle, handle, did) 231 232 // CRITICAL: Update database FIRST, then purge cache 233 // This prevents race condition where cache gets refilled with stale data 234 _, updateErr := c.userService.UpdateHandle(ctx, did, handle) 235 if updateErr != nil { 236 return fmt.Errorf("failed to update handle: %w", updateErr) 237 } 238 239 // CRITICAL: Purge BOTH old handle and DID from cache 240 // Old handle: alice.bsky.social → did:plc:abc123 (must be removed) 241 if purgeErr := c.identityResolver.Purge(ctx, existingUser.Handle); purgeErr != nil { 242 log.Printf("Warning: failed to purge old handle cache for %s: %v", existingUser.Handle, purgeErr) 243 } 244 245 // DID: did:plc:abc123 → alice.bsky.social (must be removed) 246 if purgeErr := c.identityResolver.Purge(ctx, did); purgeErr != nil { 247 log.Printf("Warning: failed to purge DID cache for %s: %v", did, purgeErr) 248 } 249 250 log.Printf("Updated handle and purged cache: %s → %s", existingUser.Handle, handle) 251 } else { 252 log.Printf("Handle unchanged for %s (%s)", handle, did) 253 } 254 255 return nil 256} 257 258// handleAccountEvent processes account events (account creation/updates) 259func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error { 260 if event.Account == nil { 261 return fmt.Errorf("account event missing account data") 262 } 263 264 did := event.Account.Did 265 if did == "" { 266 return fmt.Errorf("account event missing did") 267 } 268 269 // Account events don't include handle, so we can't index yet 270 // We'll wait for the corresponding identity event 271 log.Printf("Account event for %s (waiting for identity event)", did) 272 return nil 273} 274 275// isDuplicateError checks if error is due to duplicate DID/handle 276func isDuplicateError(err error) bool { 277 if err == nil { 278 return false 279 } 280 errStr := err.Error() 281 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate") 282} 283 284func contains(s, substr string) bool { 285 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr)) 286} 287 288func anySubstring(s, substr string) bool { 289 for i := 0; i <= len(s)-len(substr); i++ { 290 if s[i:i+len(substr)] == substr { 291 return true 292 } 293 } 294 return false 295}