its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "log/slog" 9 "net/http" 10 "sync" 11 "time" 12 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/api/bsky" 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/bluesky-social/jetstream/pkg/client" 17 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 18 "github.com/bluesky-social/jetstream/pkg/models" 19 "github.com/gorilla/mux" 20 "github.com/gorilla/websocket" 21) 22 23type Set[T comparable] map[T]struct{} 24 25// Data structures 26type SubscriberData struct { 27 DID string 28 Conn *websocket.Conn 29 Follows Set[string] 30 Reposts Set[string] 31} 32 33type NotificationMessage struct { 34 Liked bool `json:"liked"` 35 ByDid string `json:"did"` 36 RepostURI string `json:"repost_uri"` 37} 38 39// Global state 40var ( 41 subscribers = make(map[string]*SubscriberData) 42 subscribersMux sync.RWMutex 43 44 likeStream *client.Client 45 subscriberStream *client.Client 46 47 xrpcClient *xrpc.Client 48 49 upgrader = websocket.Upgrader{ 50 CheckOrigin: func(r *http.Request) bool { 51 return true 52 }, 53 } 54 55 logger *slog.Logger 56) 57 58func main() { 59 logger = slog.Default() 60 61 xrpcClient = &xrpc.Client{ 62 Client: &http.Client{ 63 Timeout: 30 * time.Second, 64 }, 65 Host: "https://bsky.social", 66 } 67 68 if err := initializeJetstreams(); err != nil { 69 log.Fatalf("cannot start jetstream: %s", err) 70 } 71 72 r := mux.NewRouter() 73 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") 74 75 log.Println("Server starting on :8080") 76 if err := http.ListenAndServe(":8080", r); err != nil { 77 log.Fatalf("error while serving: %s", err) 78 } 79} 80 81func handleSubscribe(w http.ResponseWriter, r *http.Request) { 82 vars := mux.Vars(r) 83 did := vars["did"] 84 85 conn, err := upgrader.Upgrade(w, r, nil) 86 if err != nil { 87 logger.Error("WebSocket upgrade failed", "error", err) 88 return 89 } 90 defer conn.Close() 91 92 logger.Info("New subscriber", "did", did) 93 94 follows, err := fetchFollows(r.Context(), did) 95 if err != nil { 96 logger.Error("Error fetching follows", "did", did, "error", err) 97 return 98 } 99 100 reposts, err := fetchReposts(r.Context(), did) 101 if err != nil { 102 logger.Error("Error fetching reposts", "did", did, "error", err) 103 return 104 } 105 106 // Store subscriber data 107 subscriber := &SubscriberData{ 108 DID: did, 109 Conn: conn, 110 Follows: follows, 111 Reposts: reposts, 112 } 113 114 subscribersMux.Lock() 115 subscribers[did] = subscriber 116 subscribersMux.Unlock() 117 updateSubscriberStreamOpts() 118 // delete subscriber after we are done 119 defer func() { 120 subscribersMux.Lock() 121 delete(subscribers, did) 122 subscribersMux.Unlock() 123 updateSubscriberStreamOpts() 124 }() 125 126 for { 127 _, _, err := conn.ReadMessage() 128 if err != nil { 129 logger.Info("WebSocket connection closed", "did", did, "error", err) 130 break 131 } 132 } 133} 134 135func fetchReposts(ctx context.Context, did string) (Set[string], error) { 136 all := make(Set[string]) 137 cursor := "" 138 139 for { 140 out, err := atproto.RepoListRecords(ctx, &xrpc.Client{}, "app.bsky.feed.repost", cursor, 100, did, false) 141 if err != nil { 142 return nil, err 143 } 144 145 for _, record := range out.Records { 146 all[record.Uri] = struct{}{} 147 } 148 149 if out.Cursor == nil || *out.Cursor == "" { 150 break 151 } 152 cursor = *out.Cursor 153 } 154 155 return all, nil 156} 157 158func fetchFollows(ctx context.Context, did string) (Set[string], error) { 159 all := make(Set[string]) 160 cursor := "" 161 162 for { 163 out, err := bsky.GraphGetFollows(ctx, &xrpc.Client{}, did, cursor, 100) 164 if err != nil { 165 return nil, err 166 } 167 168 for _, record := range out.Follows { 169 all[record.Did] = struct{}{} 170 } 171 172 if out.Cursor == nil || *out.Cursor == "" { 173 break 174 } 175 cursor = *out.Cursor 176 } 177 178 return all, nil 179} 180 181func initializeJetstreams() error { 182 if err := startLikeClient(); err != nil { 183 return fmt.Errorf("like stream: %w", err) 184 } 185 if err := startSubscriberClient(); err != nil { 186 return fmt.Errorf("subscriber stream: %w", err) 187 } 188 return nil 189} 190 191func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { 192 return models.SubscriberOptionsUpdatePayload{ 193 WantedCollections: []string{"app.bsky.feed.like"}, 194 WantedDIDs: getFollowsDids(), 195 } 196} 197 198func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { 199 return models.SubscriberOptionsUpdatePayload{ 200 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, 201 WantedDIDs: getSubscriberDids(), 202 } 203} 204 205func updateLikeStreamOpts() { 206 err := likeStream.SendOptionsUpdate(getLikeStreamOpts()) 207 if err != nil { 208 // reinit like stream 209 } 210} 211 212func updateSubscriberStreamOpts() { 213 err := subscriberStream.SendOptionsUpdate(getSubscriberStreamOpts()) 214 if err != nil { 215 // reinit subscriber stream 216 } 217} 218 219func startLikeClient() error { 220 opts := getLikeStreamOpts() 221 if len(opts.WantedDIDs) == 0 { 222 return nil // No follows to track 223 } 224 225 handler := &likeHandler{} 226 var err error 227 likeStream, err = startJetstreamClient("like_tracker", opts, handler.HandleEvent) 228 if err != nil { 229 return err 230 } 231 232 return nil 233} 234 235func startSubscriberClient() error { 236 opts := getSubscriberStreamOpts() 237 if len(opts.WantedDIDs) == 0 { 238 return nil // No subscribers to track 239 } 240 241 handler := &subscriberHandler{} 242 var err error 243 subscriberStream, err = startJetstreamClient("subscriber", opts, handler.HandleEvent) 244 if err != nil { 245 return err 246 } 247 248 return nil 249} 250 251func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent func(context.Context, *models.Event) error) (*client.Client, error) { 252 ctx := context.Background() 253 254 config := client.DefaultClientConfig() 255 config.WebsocketURL = "wss://jetstream.atproto.tools/subscribe" 256 config.Compress = true 257 config.WantedCollections = opts.WantedCollections 258 config.WantedDids = opts.WantedDIDs 259 260 scheduler := sequential.NewScheduler(name, logger, handleEvent) 261 262 c, err := client.NewClient(config, logger, scheduler) 263 if err != nil { 264 logger.Error("Failed to create client", "name", name, "error", err) 265 return nil, err 266 } 267 268 cursor := time.Now().UnixMicro() 269 270 logger.Info("Starting client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs)) 271 if err := c.ConnectAndRead(ctx, &cursor); err != nil { 272 logger.Error("Client failed", "name", name, "error", err) 273 return nil, err 274 } 275 276 return c, nil 277} 278 279func getFollowsDids() []string { 280 subscribersMux.RLock() 281 defer subscribersMux.RUnlock() 282 283 var dids []string 284 for _, subscriber := range subscribers { 285 for follow, _ := range subscriber.Follows { 286 dids = append(dids, follow) 287 } 288 } 289 290 return dids 291} 292 293func getSubscriberDids() []string { 294 subscribersMux.RLock() 295 defer subscribersMux.RUnlock() 296 297 var dids []string 298 for did := range subscribers { 299 dids = append(dids, did) 300 } 301 302 return dids 303} 304 305type likeHandler struct{} 306 307func (h *likeHandler) HandleEvent(ctx context.Context, event *models.Event) error { 308 var like bsky.FeedLike 309 if err := json.Unmarshal(event.Commit.Record, &like); err != nil { 310 logger.Error("Failed to unmarshal like", "error", err) 311 return nil 312 } 313 314 subscribersMux.RLock() 315 defer subscribersMux.RUnlock() 316 317 for _, subscriber := range subscribers { 318 for repostURI, _ := range subscriber.Reposts { 319 // (un)liked a post the subscriber reposted 320 if like.Subject.Uri == repostURI { 321 notification := NotificationMessage{ 322 Liked: event.Commit.Operation != models.CommitOperationDelete, 323 ByDid: event.Did, 324 RepostURI: repostURI, 325 } 326 327 if err := subscriber.Conn.WriteJSON(notification); err != nil { 328 logger.Error("Failed to send notification", "subscriber", subscriber.DID, "error", err) 329 } 330 } 331 } 332 } 333 334 return nil 335} 336 337type subscriberHandler struct{} 338 339func (h *subscriberHandler) HandleEvent(ctx context.Context, event *models.Event) error { 340 switch event.Commit.Collection { 341 case "app.bsky.feed.repost": 342 modifySubscribersWithEvent( 343 event, 344 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) }, 345 func(s *SubscriberData, r bsky.FeedRepost) { 346 s.Reposts[r.Subject.Uri] = struct{}{} 347 }, 348 ) 349 case "app.bsky.graph.follow": 350 modifySubscribersWithEvent( 351 event, 352 func(s *SubscriberData, r bsky.GraphFollow) { delete(s.Follows, r.Subject) }, 353 func(s *SubscriberData, r bsky.GraphFollow) { 354 s.Follows[r.Subject] = struct{}{} 355 }, 356 ) 357 updateLikeStreamOpts() 358 } 359 360 return nil 361} 362 363type ModifyFunc[v any] func(*SubscriberData, v) 364 365func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error { 366 var data v 367 if err := json.Unmarshal(event.Commit.Record, &data); err != nil { 368 logger.Error("Failed to unmarshal repost", "error", err) 369 return nil 370 } 371 372 subscribersMux.Lock() 373 defer subscribersMux.Unlock() 374 375 if subscriber, exists := subscribers[event.Did]; exists { 376 if event.Commit.Operation == models.CommitOperationDelete { 377 onDelete(subscriber, data) 378 } else { 379 onUpdate(subscriber, data) 380 } 381 } 382 383 return nil 384}