its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "log" 7 "log/slog" 8 "net/http" 9 10 "github.com/bluesky-social/indigo/api/bsky" 11 "github.com/bluesky-social/indigo/xrpc" 12 "github.com/bluesky-social/jetstream/pkg/client" 13 "github.com/bluesky-social/jetstream/pkg/models" 14 "github.com/cornelk/hashmap" 15 "github.com/gorilla/mux" 16 "github.com/gorilla/websocket" 17) 18 19type Set[T comparable] map[T]struct{} 20 21// Data structures 22type SubscriberData struct { 23 DID string 24 Conn *websocket.Conn 25 ListenTo Set[string] 26 Reposts Set[string] 27} 28 29type NotificationMessage struct { 30 Liked bool `json:"liked"` 31 ByDid string `json:"did"` 32 RepostURI string `json:"repost_uri"` 33} 34 35// Global state 36var ( 37 subscribers = hashmap.New[string, *SubscriberData]() 38 39 likeStream *client.Client 40 subscriberStream *client.Client 41 42 upgrader = websocket.Upgrader{ 43 CheckOrigin: func(r *http.Request) bool { 44 return true 45 }, 46 } 47 48 logger *slog.Logger 49) 50 51func getFollowsDids() []string { 52 var dids []string 53 subscribers.Range(func(s string, sd *SubscriberData) bool { 54 for follow, _ := range sd.ListenTo { 55 dids = append(dids, follow) 56 } 57 return true 58 }) 59 return dids 60} 61 62func getSubscriberDids() []string { 63 dids := make([]string, 0, subscribers.Len()) 64 subscribers.Range(func(s string, sd *SubscriberData) bool { 65 dids = append(dids, s) 66 return true 67 }) 68 return dids 69} 70 71func main() { 72 logger = slog.Default() 73 74 go likeStreamLoop(logger) 75 go subscriberStreamLoop(logger) 76 77 r := mux.NewRouter() 78 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") 79 80 log.Println("Server starting on :8080") 81 if err := http.ListenAndServe(":8080", r); err != nil { 82 log.Fatalf("error while serving: %s", err) 83 } 84} 85 86func handleSubscribe(w http.ResponseWriter, r *http.Request) { 87 vars := mux.Vars(r) 88 did := vars["did"] 89 90 logger = logger.With("did", did) 91 92 conn, err := upgrader.Upgrade(w, r, nil) 93 if err != nil { 94 logger.Error("WebSocket upgrade failed", "error", err) 95 return 96 } 97 defer conn.Close() 98 99 logger.Info("new subscriber") 100 101 pdsURI, err := findUserPDS(r.Context(), did) 102 if err != nil { 103 logger.Error("cant resolve user pds", "error", err) 104 return 105 } 106 logger = logger.With("pds", pdsURI) 107 108 xrpcClient := &xrpc.Client{ 109 Host: pdsURI, 110 } 111 // todo: implement skipping fetching follows and allow specifying users to listen to via websocket 112 follows, err := fetchFollows(r.Context(), xrpcClient, did) 113 if err != nil { 114 logger.Error("error fetching follows", "error", err) 115 return 116 } 117 logger.Info("fetched follows") 118 reposts, err := fetchReposts(r.Context(), xrpcClient, did) 119 if err != nil { 120 logger.Error("error fetching reposts", "error", err) 121 return 122 } 123 logger.Info("fetched reposts") 124 125 subscriber := &SubscriberData{ 126 DID: did, 127 Conn: conn, 128 // use user follows as default listen to 129 ListenTo: follows, 130 Reposts: reposts, 131 } 132 133 subscribers.Set(did, subscriber) 134 updateSubscriberStreamOpts() 135 updateLikeStreamOpts() 136 // delete subscriber after we are done 137 defer func() { 138 subscribers.Del(did) 139 updateSubscriberStreamOpts() 140 updateLikeStreamOpts() 141 }() 142 143 logger.Info("serving subscriber") 144 145 for { 146 _, _, err := conn.ReadMessage() 147 if err != nil { 148 logger.Info("WebSocket connection closed", "error", err) 149 break 150 } 151 } 152} 153 154func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { 155 return models.SubscriberOptionsUpdatePayload{ 156 WantedCollections: []string{"app.bsky.feed.like"}, 157 WantedDIDs: getFollowsDids(), 158 } 159} 160 161func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { 162 return models.SubscriberOptionsUpdatePayload{ 163 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, 164 WantedDIDs: getSubscriberDids(), 165 } 166} 167 168func updateLikeStreamOpts() { 169 opts := getLikeStreamOpts() 170 err := likeStream.SendOptionsUpdate(opts) 171 if err != nil { 172 logger.Error("couldnt update like stream opts", "error", err) 173 return 174 } 175 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs)) 176} 177 178func updateSubscriberStreamOpts() { 179 opts := getSubscriberStreamOpts() 180 err := subscriberStream.SendOptionsUpdate(opts) 181 if err != nil { 182 logger.Error("couldnt update subscriber stream opts", "error", err) 183 return 184 } 185 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs)) 186} 187 188func likeStreamLoop(logger *slog.Logger) { 189 startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts) 190} 191 192func subscriberStreamLoop(logger *slog.Logger) { 193 startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts) 194} 195 196func HandleLikeEvent(ctx context.Context, event *models.Event) error { 197 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 { 198 return nil 199 } 200 201 var like bsky.FeedLike 202 if err := json.Unmarshal(event.Commit.Record, &like); err != nil { 203 logger.Error("Failed to unmarshal like", "error", err) 204 return nil 205 } 206 207 subscribers.Range(func(s string, sd *SubscriberData) bool { 208 for repostURI, _ := range sd.Reposts { 209 // (un)liked a post the subscriber reposted 210 if like.Subject.Uri == repostURI { 211 notification := NotificationMessage{ 212 Liked: event.Commit.Operation != models.CommitOperationDelete, 213 ByDid: event.Did, 214 RepostURI: repostURI, 215 } 216 217 if err := sd.Conn.WriteJSON(notification); err != nil { 218 logger.Error("Failed to send notification", "subscriber", sd.DID, "error", err) 219 } 220 } 221 } 222 return true 223 }) 224 225 return nil 226} 227 228func HandleSubscriberEvent(ctx context.Context, event *models.Event) error { 229 if event == nil || event.Commit == nil { 230 return nil 231 } 232 233 switch event.Commit.Collection { 234 case "app.bsky.feed.repost": 235 modifySubscribersWithEvent( 236 event, 237 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) }, 238 func(s *SubscriberData, r bsky.FeedRepost) { 239 s.Reposts[r.Subject.Uri] = struct{}{} 240 }, 241 ) 242 case "app.bsky.graph.follow": 243 modifySubscribersWithEvent( 244 event, 245 func(s *SubscriberData, r bsky.GraphFollow) { delete(s.ListenTo, r.Subject) }, 246 func(s *SubscriberData, r bsky.GraphFollow) { 247 s.ListenTo[r.Subject] = struct{}{} 248 }, 249 ) 250 } 251 252 return nil 253} 254 255type ModifyFunc[v any] func(*SubscriberData, v) 256 257func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error { 258 if len(event.Commit.Record) == 0 { 259 return nil 260 } 261 262 var data v 263 if err := json.Unmarshal(event.Commit.Record, &data); err != nil { 264 logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record) 265 return nil 266 } 267 268 if subscriber, exists := subscribers.Get(event.Did); exists { 269 if event.Commit.Operation == models.CommitOperationDelete { 270 onDelete(subscriber, data) 271 } else { 272 onUpdate(subscriber, data) 273 } 274 } 275 276 return nil 277}