its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "log/slog" 8 "net/http" 9 "sync" 10 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/xrpc" 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/gorilla/mux" 16 "github.com/gorilla/websocket" 17) 18 19type Set[T comparable] map[T]struct{} 20 21// Data structures 22type SubscriberData struct { 23 DID string 24 Conn *websocket.Conn 25 ListenTo Set[string] 26 Reposts Set[string] 27} 28 29type NotificationMessage struct { 30 Liked bool `json:"liked"` 31 ByDid string `json:"did"` 32 RepostURI string `json:"repost_uri"` 33} 34 35// Global state 36var ( 37 subscribers = make(map[string]*SubscriberData) 38 subscribersMux sync.RWMutex 39 40 likeStream *client.Client 41 subscriberStream *client.Client 42 43 upgrader = websocket.Upgrader{ 44 CheckOrigin: func(r *http.Request) bool { 45 return true 46 }, 47 } 48 49 logger *slog.Logger 50) 51 52func getFollowsDids() []string { 53 subscribersMux.RLock() 54 defer subscribersMux.RUnlock() 55 56 var dids []string 57 for _, subscriber := range subscribers { 58 for follow, _ := range subscriber.ListenTo { 59 dids = append(dids, follow) 60 } 61 } 62 63 return dids 64} 65 66func getSubscriberDids() []string { 67 subscribersMux.RLock() 68 defer subscribersMux.RUnlock() 69 70 var dids []string 71 for did := range subscribers { 72 dids = append(dids, did) 73 } 74 75 return dids 76} 77 78func main() { 79 logger = slog.Default() 80 81 go likeStreamLoop(logger) 82 go subscriberStreamLoop(logger) 83 84 r := mux.NewRouter() 85 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") 86 87 log.Println("Server starting on :8080") 88 if err := http.ListenAndServe(":8080", r); err != nil { 89 log.Fatalf("error while serving: %s", err) 90 } 91} 92 93func handleSubscribe(w http.ResponseWriter, r *http.Request) { 94 vars := mux.Vars(r) 95 did := vars["did"] 96 97 logger = logger.With("did", did) 98 99 conn, err := upgrader.Upgrade(w, r, nil) 100 if err != nil { 101 logger.Error("WebSocket upgrade failed", "error", err) 102 return 103 } 104 defer conn.Close() 105 106 logger.Info("new subscriber") 107 108 pdsURI, err := findUserPDS(r.Context(), did) 109 if err != nil { 110 logger.Error("cant resolve user pds", "error", err) 111 return 112 } 113 logger = logger.With("pds", pdsURI) 114 115 xrpcClient := &xrpc.Client{ 116 Host: pdsURI, 117 } 118 // todo: implement skipping fetching follows and allow specifying users to listen to via websocket 119 follows, err := fetchFollows(r.Context(), xrpcClient, did) 120 if err != nil { 121 logger.Error("error fetching follows", "error", err) 122 return 123 } 124 logger.Info("fetched follows") 125 reposts, err := fetchReposts(r.Context(), xrpcClient, did) 126 if err != nil { 127 logger.Error("error fetching reposts", "error", err) 128 return 129 } 130 logger.Info("fetched reposts") 131 132 subscriber := &SubscriberData{ 133 DID: did, 134 Conn: conn, 135 // use user follows as default listen to 136 ListenTo: follows, 137 Reposts: reposts, 138 } 139 140 subscribersMux.Lock() 141 subscribers[did] = subscriber 142 subscribersMux.Unlock() 143 updateSubscriberStreamOpts() 144 updateLikeStreamOpts() 145 // delete subscriber after we are done 146 defer func() { 147 subscribersMux.Lock() 148 delete(subscribers, did) 149 subscribersMux.Unlock() 150 updateSubscriberStreamOpts() 151 updateLikeStreamOpts() 152 }() 153 154 logger.Info("serving subscriber") 155 156 for { 157 _, _, err := conn.ReadMessage() 158 if err != nil { 159 logger.Info("WebSocket connection closed", "error", err) 160 break 161 } 162 } 163} 164 165func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { 166 return models.SubscriberOptionsUpdatePayload{ 167 WantedCollections: []string{"app.bsky.feed.like"}, 168 WantedDIDs: getFollowsDids(), 169 } 170} 171 172func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { 173 return models.SubscriberOptionsUpdatePayload{ 174 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, 175 WantedDIDs: getSubscriberDids(), 176 } 177} 178 179func updateLikeStreamOpts() { 180 opts := getLikeStreamOpts() 181 err := likeStream.SendOptionsUpdate(opts) 182 if err != nil { 183 logger.Error("couldnt update like stream opts", "error", err) 184 return 185 } 186 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs)) 187} 188 189func updateSubscriberStreamOpts() { 190 opts := getSubscriberStreamOpts() 191 err := subscriberStream.SendOptionsUpdate(opts) 192 if err != nil { 193 logger.Error("couldnt update subscriber stream opts", "error", err) 194 return 195 } 196 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs)) 197} 198 199func likeStreamLoop(logger *slog.Logger) { 200 startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts) 201} 202 203func subscriberStreamLoop(logger *slog.Logger) { 204 startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts) 205} 206 207func HandleLikeEvent(ctx context.Context, event *models.Event) error { 208 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 { 209 return nil 210 } 211 212 var like bsky.FeedLike 213 if err := json.Unmarshal(event.Commit.Record, &like); err != nil { 214 logger.Error("Failed to unmarshal like", "error", err) 215 return nil 216 } 217 218 subscribersMux.RLock() 219 defer subscribersMux.RUnlock() 220 221 for _, subscriber := range subscribers { 222 for repostURI, _ := range subscriber.Reposts { 223 // (un)liked a post the subscriber reposted 224 if like.Subject.Uri == repostURI { 225 notification := NotificationMessage{ 226 Liked: event.Commit.Operation != models.CommitOperationDelete, 227 ByDid: event.Did, 228 RepostURI: repostURI, 229 } 230 231 if err := subscriber.Conn.WriteJSON(notification); err != nil { 232 logger.Error("Failed to send notification", "subscriber", subscriber.DID, "error", err) 233 } 234 } 235 } 236 } 237 238 return nil 239} 240 241func HandleSubscriberEvent(ctx context.Context, event *models.Event) error { 242 if event == nil || event.Commit == nil { 243 return nil 244 } 245 246 switch event.Commit.Collection { 247 case "app.bsky.feed.repost": 248 modifySubscribersWithEvent( 249 event, 250 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) }, 251 func(s *SubscriberData, r bsky.FeedRepost) { 252 s.Reposts[r.Subject.Uri] = struct{}{} 253 }, 254 ) 255 case "app.bsky.graph.follow": 256 modifySubscribersWithEvent( 257 event, 258 func(s *SubscriberData, r bsky.GraphFollow) { delete(s.ListenTo, r.Subject) }, 259 func(s *SubscriberData, r bsky.GraphFollow) { 260 s.ListenTo[r.Subject] = struct{}{} 261 }, 262 ) 263 } 264 265 return nil 266} 267 268type ModifyFunc[v any] func(*SubscriberData, v) 269 270func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error { 271 if len(event.Commit.Record) == 0 { 272 return nil 273 } 274 275 var data v 276 if err := json.Unmarshal(event.Commit.Record, &data); err != nil { 277 logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record) 278 return nil 279 } 280 281 subscribersMux.Lock() 282 defer subscribersMux.Unlock() 283 284 if subscriber, exists := subscribers[event.Did]; exists { 285 if event.Commit.Operation == models.CommitOperationDelete { 286 onDelete(subscriber, data) 287 } else { 288 onUpdate(subscriber, data) 289 } 290 } 291 292 return nil 293}