its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "log/slog"
8 "net/http"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/xrpc"
12 "github.com/bluesky-social/jetstream/pkg/client"
13 "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/cornelk/hashmap"
15 "github.com/gorilla/mux"
16 "github.com/gorilla/websocket"
17)
18
19type Set[T comparable] map[T]struct{}
20
21const ListenTypeNone = "none"
22const ListenTypeFollows = "follows"
23
24type SubscriberData struct {
25 DID string
26 Conn *websocket.Conn
27 ListenType string
28 ListenTo Set[string]
29 Reposts Set[string]
30}
31
32type NotificationMessage struct {
33 Liked bool `json:"liked"`
34 ByDid string `json:"did"`
35 RepostURI string `json:"repost_uri"`
36}
37
38type SubscriberMessage struct {
39 Type string `json:"type"`
40 Content json.RawMessage `json:"content"`
41}
42
43type SubscriberUpdateListenTo struct {
44 ListenTo []string `json:"listen_to"`
45}
46
47var (
48 // storing the subscriber data in both Should Be Fine
49 // we dont modify subscriber data at the same time in two places
50 subscribers = hashmap.New[string, *SubscriberData]()
51 listeningTo = hashmap.New[string, *hashmap.Map[string, *SubscriberData]]()
52
53 likeStream *client.Client
54 subscriberStream *client.Client
55
56 upgrader = websocket.Upgrader{
57 CheckOrigin: func(r *http.Request) bool {
58 return true
59 },
60 }
61
62 logger *slog.Logger
63)
64
65func getSubscriberDids() []string {
66 dids := make([]string, 0, subscribers.Len())
67 subscribers.Range(func(s string, sd *SubscriberData) bool {
68 dids = append(dids, s)
69 return true
70 })
71 return dids
72}
73
74func listenTo(sd *SubscriberData, did string) {
75 targetDids, _ := listeningTo.GetOrInsert(did, hashmap.New[string, *SubscriberData]())
76 targetDids.Insert(sd.DID, sd)
77}
78
79func stopListeningTo(subscriberDid, did string) {
80 if targetDids, exists := listeningTo.Get(did); exists {
81 targetDids.Del(subscriberDid)
82 }
83}
84
85func main() {
86 logger = slog.Default()
87
88 go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
89 go startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
90
91 r := mux.NewRouter()
92 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
93
94 log.Println("server starting on :8080")
95 if err := http.ListenAndServe(":8080", r); err != nil {
96 log.Fatalf("error while serving: %s", err)
97 }
98}
99
100func handleSubscribe(w http.ResponseWriter, r *http.Request) {
101 vars := mux.Vars(r)
102 did := vars["did"]
103
104 query := r.URL.Query()
105 // "follows", everything else is considered as "none"
106 listenType := query.Get("listenTo")
107 if len(listenType) == 0 {
108 listenType = ListenTypeFollows
109 }
110
111 logger = logger.With("did", did)
112
113 conn, err := upgrader.Upgrade(w, r, nil)
114 if err != nil {
115 logger.Error("WebSocket upgrade failed", "error", err)
116 return
117 }
118 defer conn.Close()
119
120 logger.Info("new subscriber")
121
122 pdsURI, err := findUserPDS(r.Context(), did)
123 if err != nil {
124 logger.Error("cant resolve user pds", "error", err)
125 return
126 }
127 logger = logger.With("pds", pdsURI)
128
129 xrpcClient := &xrpc.Client{
130 Host: pdsURI,
131 }
132
133 var subbedTo Set[string]
134
135 switch listenType {
136 case ListenTypeFollows:
137 follows, err := fetchFollows(r.Context(), xrpcClient, did)
138 if err != nil {
139 logger.Error("error fetching follows", "error", err)
140 return
141 }
142 logger.Info("fetched follows")
143 subbedTo = follows
144 case ListenTypeNone:
145 subbedTo = make(Set[string])
146 default:
147 logger.Error("invalid listen type", "requestedType", listenType)
148 return
149 }
150
151 reposts, err := fetchReposts(r.Context(), xrpcClient, did)
152 if err != nil {
153 logger.Error("error fetching reposts", "error", err)
154 return
155 }
156 logger.Info("fetched reposts")
157
158 sd := &SubscriberData{
159 DID: did,
160 Conn: conn,
161 ListenType: listenType,
162 ListenTo: subbedTo,
163 Reposts: reposts,
164 }
165
166 subscribers.Set(sd.DID, sd)
167 for listenDid := range sd.ListenTo {
168 listenTo(sd, listenDid)
169 }
170
171 updateSubscriberStreamOpts()
172 updateLikeStreamOpts()
173 // delete subscriber after we are done
174 defer func() {
175 for listenDid := range sd.ListenTo {
176 stopListeningTo(sd.DID, listenDid)
177 }
178 subscribers.Del(sd.DID)
179
180 updateSubscriberStreamOpts()
181 updateLikeStreamOpts()
182 }()
183
184 logger.Info("serving subscriber")
185
186 for {
187 var msg SubscriberMessage
188 err := conn.ReadJSON(&msg)
189 if err != nil {
190 logger.Info("WebSocket connection closed", "error", err)
191 break
192 }
193 switch msg.Type {
194 case "update_listen_to":
195 // only allow this if we arent managing listen to
196 if sd.ListenType != ListenTypeNone {
197 continue
198 }
199
200 var innerMsg SubscriberUpdateListenTo
201 if err := json.Unmarshal(msg.Content, &innerMsg); err != nil {
202 logger.Info("invalid message", "error", err)
203 break
204 }
205 // remove all current listens and add the ones the user requested
206 for listenDid := range sd.ListenTo {
207 stopListeningTo(sd.DID, listenDid)
208 delete(sd.ListenTo, listenDid)
209 }
210 for _, listenDid := range innerMsg.ListenTo {
211 sd.ListenTo[listenDid] = struct{}{}
212 listenTo(sd, listenDid)
213 }
214 }
215 }
216}
217
218func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
219 return models.SubscriberOptionsUpdatePayload{
220 WantedCollections: []string{"app.bsky.feed.like"},
221 // WantedDIDs: getFollowsDids(),
222 }
223}
224
225func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload {
226 return models.SubscriberOptionsUpdatePayload{
227 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"},
228 WantedDIDs: getSubscriberDids(),
229 }
230}
231
232func updateLikeStreamOpts() {
233 opts := getLikeStreamOpts()
234 err := likeStream.SendOptionsUpdate(opts)
235 if err != nil {
236 logger.Error("couldnt update like stream opts", "error", err)
237 return
238 }
239 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs))
240}
241
242func updateSubscriberStreamOpts() {
243 opts := getSubscriberStreamOpts()
244 err := subscriberStream.SendOptionsUpdate(opts)
245 if err != nil {
246 logger.Error("couldnt update subscriber stream opts", "error", err)
247 return
248 }
249 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
250}
251
252func HandleLikeEvent(ctx context.Context, event *models.Event) error {
253 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 {
254 return nil
255 }
256
257 // skip handling event if its not from a source we are listening to
258 targets, exists := listeningTo.Get(event.Did)
259 if !exists {
260 return nil
261 }
262
263 var like bsky.FeedLike
264 if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
265 logger.Error("failed to unmarshal like", "error", err)
266 return nil
267 }
268
269 targets.Range(func(s string, sd *SubscriberData) bool {
270 for repostURI, _ := range sd.Reposts {
271 // (un)liked a post the subscriber reposted
272 if like.Subject.Uri == repostURI {
273 notification := NotificationMessage{
274 Liked: event.Commit.Operation != models.CommitOperationDelete,
275 ByDid: event.Did,
276 RepostURI: repostURI,
277 }
278
279 if err := sd.Conn.WriteJSON(notification); err != nil {
280 logger.Error("failed to send notification", "subscriber", sd.DID, "error", err)
281 }
282 }
283 }
284 return true
285 })
286
287 return nil
288}
289
290func HandleSubscriberEvent(ctx context.Context, event *models.Event) error {
291 if event == nil || event.Commit == nil {
292 return nil
293 }
294
295 switch event.Commit.Collection {
296 case "app.bsky.feed.repost":
297 modifySubscribersWithEvent(
298 event,
299 func(s *SubscriberData, r bsky.FeedRepost, deleted bool) {
300 if deleted {
301 delete(s.Reposts, r.Subject.Uri)
302 } else {
303 s.Reposts[r.Subject.Uri] = struct{}{}
304 }
305 },
306 )
307 case "app.bsky.graph.follow":
308 modifySubscribersWithEvent(
309 event,
310 func(s *SubscriberData, r bsky.GraphFollow, deleted bool) {
311 // if we arent managing then we dont need to update anything
312 if s.ListenType != ListenTypeFollows {
313 return
314 }
315 if deleted {
316 stopListeningTo(s.DID, r.Subject)
317 delete(s.ListenTo, r.Subject)
318 } else {
319 s.ListenTo[r.Subject] = struct{}{}
320 listenTo(s, r.Subject)
321 }
322 },
323 )
324 }
325
326 return nil
327}
328
329type ModifyFunc[v any] func(*SubscriberData, v, bool)
330
331func modifySubscribersWithEvent[v any](event *models.Event, handle ModifyFunc[v]) error {
332 if len(event.Commit.Record) == 0 {
333 return nil
334 }
335
336 var data v
337 if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
338 logger.Error("failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
339 return nil
340 }
341
342 if subscriber, exists := subscribers.Get(event.Did); exists {
343 handle(subscriber, data, event.Commit.Operation == models.CommitOperationDelete)
344 }
345
346 return nil
347}