its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "log/slog" 8 "net/http" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/xrpc" 12 "github.com/bluesky-social/jetstream/pkg/client" 13 "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/cornelk/hashmap" 15 "github.com/gorilla/mux" 16 "github.com/gorilla/websocket" 17) 18 19type Set[T comparable] map[T]struct{} 20 21const ListenTypeNone = "none" 22const ListenTypeFollows = "follows" 23 24type SubscriberData struct { 25 DID string 26 Conn *websocket.Conn 27 ListenType string 28 ListenTo Set[string] 29 Reposts Set[string] 30} 31 32type NotificationMessage struct { 33 Liked bool `json:"liked"` 34 ByDid string `json:"did"` 35 RepostURI string `json:"repost_uri"` 36} 37 38type SubscriberMessage struct { 39 Type string `json:"type"` 40 Content json.RawMessage `json:"content"` 41} 42 43type SubscriberUpdateListenTo struct { 44 ListenTo []string `json:"listen_to"` 45} 46 47var ( 48 // storing the subscriber data in both Should Be Fine 49 // we dont modify subscriber data at the same time in two places 50 subscribers = hashmap.New[string, *SubscriberData]() 51 listeningTo = hashmap.New[string, *hashmap.Map[string, *SubscriberData]]() 52 53 likeStream *client.Client 54 subscriberStream *client.Client 55 56 upgrader = websocket.Upgrader{ 57 CheckOrigin: func(r *http.Request) bool { 58 return true 59 }, 60 } 61 62 logger *slog.Logger 63) 64 65func getSubscriberDids() []string { 66 dids := make([]string, 0, subscribers.Len()) 67 subscribers.Range(func(s string, sd *SubscriberData) bool { 68 dids = append(dids, s) 69 return true 70 }) 71 return dids 72} 73 74func listenTo(sd *SubscriberData, did string) { 75 targetDids, _ := listeningTo.GetOrInsert(did, hashmap.New[string, *SubscriberData]()) 76 targetDids.Insert(sd.DID, sd) 77} 78 79func stopListeningTo(subscriberDid, did string) { 80 if targetDids, exists := listeningTo.Get(did); exists { 81 targetDids.Del(subscriberDid) 82 } 83} 84 85func main() { 86 logger = slog.Default() 87 88 go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts) 89 go startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts) 90 91 r := mux.NewRouter() 92 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") 93 94 log.Println("server starting on :8080") 95 if err := http.ListenAndServe(":8080", r); err != nil { 96 log.Fatalf("error while serving: %s", err) 97 } 98} 99 100func handleSubscribe(w http.ResponseWriter, r *http.Request) { 101 vars := mux.Vars(r) 102 did := vars["did"] 103 104 query := r.URL.Query() 105 // "follows", everything else is considered as "none" 106 listenType := query.Get("listenTo") 107 if len(listenType) == 0 { 108 listenType = ListenTypeFollows 109 } 110 111 logger = logger.With("did", did) 112 113 conn, err := upgrader.Upgrade(w, r, nil) 114 if err != nil { 115 logger.Error("WebSocket upgrade failed", "error", err) 116 return 117 } 118 defer conn.Close() 119 120 logger.Info("new subscriber") 121 122 pdsURI, err := findUserPDS(r.Context(), did) 123 if err != nil { 124 logger.Error("cant resolve user pds", "error", err) 125 return 126 } 127 logger = logger.With("pds", pdsURI) 128 129 xrpcClient := &xrpc.Client{ 130 Host: pdsURI, 131 } 132 133 var subbedTo Set[string] 134 135 switch listenType { 136 case ListenTypeFollows: 137 follows, err := fetchFollows(r.Context(), xrpcClient, did) 138 if err != nil { 139 logger.Error("error fetching follows", "error", err) 140 return 141 } 142 logger.Info("fetched follows") 143 subbedTo = follows 144 case ListenTypeNone: 145 subbedTo = make(Set[string]) 146 default: 147 logger.Error("invalid listen type", "requestedType", listenType) 148 return 149 } 150 151 reposts, err := fetchReposts(r.Context(), xrpcClient, did) 152 if err != nil { 153 logger.Error("error fetching reposts", "error", err) 154 return 155 } 156 logger.Info("fetched reposts") 157 158 sd := &SubscriberData{ 159 DID: did, 160 Conn: conn, 161 ListenType: listenType, 162 ListenTo: subbedTo, 163 Reposts: reposts, 164 } 165 166 subscribers.Set(sd.DID, sd) 167 for listenDid := range sd.ListenTo { 168 listenTo(sd, listenDid) 169 } 170 171 updateSubscriberStreamOpts() 172 updateLikeStreamOpts() 173 // delete subscriber after we are done 174 defer func() { 175 for listenDid := range sd.ListenTo { 176 stopListeningTo(sd.DID, listenDid) 177 } 178 subscribers.Del(sd.DID) 179 180 updateSubscriberStreamOpts() 181 updateLikeStreamOpts() 182 }() 183 184 logger.Info("serving subscriber") 185 186 for { 187 var msg SubscriberMessage 188 err := conn.ReadJSON(&msg) 189 if err != nil { 190 logger.Info("WebSocket connection closed", "error", err) 191 break 192 } 193 switch msg.Type { 194 case "update_listen_to": 195 // only allow this if we arent managing listen to 196 if sd.ListenType != ListenTypeNone { 197 continue 198 } 199 200 var innerMsg SubscriberUpdateListenTo 201 if err := json.Unmarshal(msg.Content, &innerMsg); err != nil { 202 logger.Info("invalid message", "error", err) 203 break 204 } 205 // remove all current listens and add the ones the user requested 206 for listenDid := range sd.ListenTo { 207 stopListeningTo(sd.DID, listenDid) 208 delete(sd.ListenTo, listenDid) 209 } 210 for _, listenDid := range innerMsg.ListenTo { 211 sd.ListenTo[listenDid] = struct{}{} 212 listenTo(sd, listenDid) 213 } 214 } 215 } 216} 217 218func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { 219 return models.SubscriberOptionsUpdatePayload{ 220 WantedCollections: []string{"app.bsky.feed.like"}, 221 // WantedDIDs: getFollowsDids(), 222 } 223} 224 225func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { 226 return models.SubscriberOptionsUpdatePayload{ 227 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, 228 WantedDIDs: getSubscriberDids(), 229 } 230} 231 232func updateLikeStreamOpts() { 233 opts := getLikeStreamOpts() 234 err := likeStream.SendOptionsUpdate(opts) 235 if err != nil { 236 logger.Error("couldnt update like stream opts", "error", err) 237 return 238 } 239 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs)) 240} 241 242func updateSubscriberStreamOpts() { 243 opts := getSubscriberStreamOpts() 244 err := subscriberStream.SendOptionsUpdate(opts) 245 if err != nil { 246 logger.Error("couldnt update subscriber stream opts", "error", err) 247 return 248 } 249 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs)) 250} 251 252func HandleLikeEvent(ctx context.Context, event *models.Event) error { 253 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 { 254 return nil 255 } 256 257 // skip handling event if its not from a source we are listening to 258 targets, exists := listeningTo.Get(event.Did) 259 if !exists { 260 return nil 261 } 262 263 var like bsky.FeedLike 264 if err := json.Unmarshal(event.Commit.Record, &like); err != nil { 265 logger.Error("failed to unmarshal like", "error", err) 266 return nil 267 } 268 269 targets.Range(func(s string, sd *SubscriberData) bool { 270 for repostURI, _ := range sd.Reposts { 271 // (un)liked a post the subscriber reposted 272 if like.Subject.Uri == repostURI { 273 notification := NotificationMessage{ 274 Liked: event.Commit.Operation != models.CommitOperationDelete, 275 ByDid: event.Did, 276 RepostURI: repostURI, 277 } 278 279 if err := sd.Conn.WriteJSON(notification); err != nil { 280 logger.Error("failed to send notification", "subscriber", sd.DID, "error", err) 281 } 282 } 283 } 284 return true 285 }) 286 287 return nil 288} 289 290func HandleSubscriberEvent(ctx context.Context, event *models.Event) error { 291 if event == nil || event.Commit == nil { 292 return nil 293 } 294 295 switch event.Commit.Collection { 296 case "app.bsky.feed.repost": 297 modifySubscribersWithEvent( 298 event, 299 func(s *SubscriberData, r bsky.FeedRepost, deleted bool) { 300 if deleted { 301 delete(s.Reposts, r.Subject.Uri) 302 } else { 303 s.Reposts[r.Subject.Uri] = struct{}{} 304 } 305 }, 306 ) 307 case "app.bsky.graph.follow": 308 modifySubscribersWithEvent( 309 event, 310 func(s *SubscriberData, r bsky.GraphFollow, deleted bool) { 311 // if we arent managing then we dont need to update anything 312 if s.ListenType != ListenTypeFollows { 313 return 314 } 315 if deleted { 316 stopListeningTo(s.DID, r.Subject) 317 delete(s.ListenTo, r.Subject) 318 } else { 319 s.ListenTo[r.Subject] = struct{}{} 320 listenTo(s, r.Subject) 321 } 322 }, 323 ) 324 } 325 326 return nil 327} 328 329type ModifyFunc[v any] func(*SubscriberData, v, bool) 330 331func modifySubscribersWithEvent[v any](event *models.Event, handle ModifyFunc[v]) error { 332 if len(event.Commit.Record) == 0 { 333 return nil 334 } 335 336 var data v 337 if err := json.Unmarshal(event.Commit.Record, &data); err != nil { 338 logger.Error("failed to unmarshal repost", "error", err, "raw", event.Commit.Record) 339 return nil 340 } 341 342 if subscriber, exists := subscribers.Get(event.Did); exists { 343 handle(subscriber, data, event.Commit.Operation == models.CommitOperationDelete) 344 } 345 346 return nil 347}