A community based topic aggregation platform built on atproto
1package jetstream 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "Coves/internal/core/users" 11 "github.com/gorilla/websocket" 12) 13 14// JetstreamEvent represents an event from the Jetstream firehose 15// Jetstream documentation: https://docs.bsky.app/docs/advanced-guides/jetstream 16type JetstreamEvent struct { 17 Did string `json:"did"` 18 TimeUS int64 `json:"time_us"` 19 Kind string `json:"kind"` // "account", "commit", "identity" 20 Account *AccountEvent `json:"account,omitempty"` 21 Identity *IdentityEvent `json:"identity,omitempty"` 22} 23 24type AccountEvent struct { 25 Active bool `json:"active"` 26 Did string `json:"did"` 27 Seq int64 `json:"seq"` 28 Time string `json:"time"` 29} 30 31type IdentityEvent struct { 32 Did string `json:"did"` 33 Handle string `json:"handle"` 34 Seq int64 `json:"seq"` 35 Time string `json:"time"` 36} 37 38// UserEventConsumer consumes user-related events from Jetstream 39type UserEventConsumer struct { 40 userService users.UserService 41 wsURL string 42 pdsFilter string // Optional: only index users from specific PDS 43} 44 45// NewUserEventConsumer creates a new Jetstream consumer for user events 46func NewUserEventConsumer(userService users.UserService, wsURL string, pdsFilter string) *UserEventConsumer { 47 return &UserEventConsumer{ 48 userService: userService, 49 wsURL: wsURL, 50 pdsFilter: pdsFilter, 51 } 52} 53 54// Start begins consuming events from Jetstream 55// Runs indefinitely, reconnecting on errors 56func (c *UserEventConsumer) Start(ctx context.Context) error { 57 log.Printf("Starting Jetstream user consumer: %s", c.wsURL) 58 59 for { 60 select { 61 case <-ctx.Done(): 62 log.Println("Jetstream consumer shutting down") 63 return ctx.Err() 64 default: 65 if err := c.connect(ctx); err != nil { 66 log.Printf("Jetstream connection error: %v. Retrying in 5s...", err) 67 time.Sleep(5 * time.Second) 68 continue 69 } 70 } 71 } 72} 73 74// connect establishes WebSocket connection and processes events 75func (c *UserEventConsumer) connect(ctx context.Context) error { 76 conn, _, err := websocket.DefaultDialer.DialContext(ctx, c.wsURL, nil) 77 if err != nil { 78 return fmt.Errorf("failed to connect to Jetstream: %w", err) 79 } 80 defer conn.Close() 81 82 log.Println("Connected to Jetstream") 83 84 // Set read deadline to detect connection issues 85 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 86 87 // Set pong handler to keep connection alive 88 conn.SetPongHandler(func(string) error { 89 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 90 return nil 91 }) 92 93 // Start ping ticker 94 ticker := time.NewTicker(30 * time.Second) 95 defer ticker.Stop() 96 97 done := make(chan struct{}) 98 99 // Goroutine to send pings 100 // TODO: Fix race condition - multiple goroutines can call close(done) concurrently 101 // Use sync.Once to ensure close(done) is called exactly once 102 // See PR review issue #4 103 go func() { 104 for { 105 select { 106 case <-ticker.C: 107 if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { 108 log.Printf("Ping error: %v", err) 109 close(done) 110 return 111 } 112 case <-done: 113 return 114 case <-ctx.Done(): 115 return 116 } 117 } 118 }() 119 120 // Read messages 121 for { 122 select { 123 case <-ctx.Done(): 124 return ctx.Err() 125 case <-done: 126 return fmt.Errorf("connection closed") 127 default: 128 _, message, err := conn.ReadMessage() 129 if err != nil { 130 close(done) 131 return fmt.Errorf("read error: %w", err) 132 } 133 134 // Reset read deadline on successful read 135 conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 136 137 if err := c.handleEvent(ctx, message); err != nil { 138 log.Printf("Error handling event: %v", err) 139 // Continue processing other events 140 } 141 } 142 } 143} 144 145// handleEvent processes a single Jetstream event 146func (c *UserEventConsumer) handleEvent(ctx context.Context, data []byte) error { 147 var event JetstreamEvent 148 if err := json.Unmarshal(data, &event); err != nil { 149 return fmt.Errorf("failed to parse event: %w", err) 150 } 151 152 // We're interested in identity events (handle updates) and account events (new users) 153 switch event.Kind { 154 case "identity": 155 return c.handleIdentityEvent(ctx, &event) 156 case "account": 157 return c.handleAccountEvent(ctx, &event) 158 default: 159 // Ignore other event types (commits, etc.) 160 return nil 161 } 162} 163 164// HandleIdentityEventPublic is a public wrapper for testing 165func (c *UserEventConsumer) HandleIdentityEventPublic(ctx context.Context, event *JetstreamEvent) error { 166 return c.handleIdentityEvent(ctx, event) 167} 168 169// handleIdentityEvent processes identity events (handle changes) 170func (c *UserEventConsumer) handleIdentityEvent(ctx context.Context, event *JetstreamEvent) error { 171 if event.Identity == nil { 172 return fmt.Errorf("identity event missing identity data") 173 } 174 175 did := event.Identity.Did 176 handle := event.Identity.Handle 177 178 if did == "" || handle == "" { 179 return fmt.Errorf("identity event missing did or handle") 180 } 181 182 log.Printf("Identity event: %s → %s", did, handle) 183 184 // For now, we'll create/update user on identity events 185 // In a full implementation, you'd want to: 186 // 1. Check if user exists 187 // 2. Update handle if changed 188 // 3. Resolve PDS URL from DID document 189 190 // Simplified: just try to create user (will be idempotent) 191 // We need PDS URL - for now use a placeholder 192 // TODO: Implement DID→PDS resolution via PLC directory (https://plc.directory/{did}) 193 // For production federation support, resolve PDS endpoint from DID document 194 // For local dev, this works fine since we filter to our own PDS 195 // See PR review issue #2 196 pdsURL := "https://bsky.social" // Default Bluesky PDS 197 198 _, err := c.userService.CreateUser(ctx, users.CreateUserRequest{ 199 DID: did, 200 Handle: handle, 201 PDSURL: pdsURL, 202 }) 203 204 if err != nil { 205 // Check if it's a duplicate error (expected for idempotency) 206 if isDuplicateError(err) { 207 log.Printf("User already indexed: %s (%s)", handle, did) 208 return nil 209 } 210 return fmt.Errorf("failed to create user: %w", err) 211 } 212 213 log.Printf("Indexed new user: %s (%s)", handle, did) 214 return nil 215} 216 217// handleAccountEvent processes account events (account creation/updates) 218func (c *UserEventConsumer) handleAccountEvent(ctx context.Context, event *JetstreamEvent) error { 219 if event.Account == nil { 220 return fmt.Errorf("account event missing account data") 221 } 222 223 did := event.Account.Did 224 if did == "" { 225 return fmt.Errorf("account event missing did") 226 } 227 228 // Account events don't include handle, so we can't index yet 229 // We'll wait for the corresponding identity event 230 log.Printf("Account event for %s (waiting for identity event)", did) 231 return nil 232} 233 234// isDuplicateError checks if error is due to duplicate DID/handle 235func isDuplicateError(err error) bool { 236 if err == nil { 237 return false 238 } 239 errStr := err.Error() 240 return contains(errStr, "already exists") || contains(errStr, "already taken") || contains(errStr, "duplicate") 241} 242 243func contains(s, substr string) bool { 244 return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && anySubstring(s, substr)) 245} 246 247func anySubstring(s, substr string) bool { 248 for i := 0; i <= len(s)-len(substr); i++ { 249 if s[i:i+len(substr)] == substr { 250 return true 251 } 252 } 253 return false 254}