its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "log/slog"
8 "net/http"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/xrpc"
12 "github.com/bluesky-social/jetstream/pkg/client"
13 "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/cornelk/hashmap"
15 "github.com/gorilla/mux"
16 "github.com/gorilla/websocket"
17)
18
19type Set[T comparable] map[T]struct{}
20
21type SubscriberData struct {
22 DID string
23 Conn *websocket.Conn
24 ListenTo Set[string]
25 Reposts Set[string]
26}
27
28type NotificationMessage struct {
29 Liked bool `json:"liked"`
30 ByDid string `json:"did"`
31 RepostURI string `json:"repost_uri"`
32}
33
34var (
35 // storing the subscriber data in both Should Be Fine
36 // we dont modify subscriber data at the same time in two places
37 subscribers = hashmap.New[string, *SubscriberData]()
38 listeningTo = hashmap.New[string, *hashmap.Map[string, *SubscriberData]]()
39
40 likeStream *client.Client
41 subscriberStream *client.Client
42
43 upgrader = websocket.Upgrader{
44 CheckOrigin: func(r *http.Request) bool {
45 return true
46 },
47 }
48
49 logger *slog.Logger
50)
51
52func getSubscriberDids() []string {
53 dids := make([]string, 0, subscribers.Len())
54 subscribers.Range(func(s string, sd *SubscriberData) bool {
55 dids = append(dids, s)
56 return true
57 })
58 return dids
59}
60
61func listenTo(sd *SubscriberData, did string) {
62 targetDids, _ := listeningTo.GetOrInsert(did, hashmap.New[string, *SubscriberData]())
63 targetDids.Insert(sd.DID, sd)
64}
65
66func stopListeningTo(subscriberDid, did string) {
67 if targetDids, exists := listeningTo.Get(did); exists {
68 targetDids.Del(subscriberDid)
69 }
70}
71
72func main() {
73 logger = slog.Default()
74
75 go likeStreamLoop(logger)
76 go subscriberStreamLoop(logger)
77
78 r := mux.NewRouter()
79 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
80
81 log.Println("Server starting on :8080")
82 if err := http.ListenAndServe(":8080", r); err != nil {
83 log.Fatalf("error while serving: %s", err)
84 }
85}
86
87func handleSubscribe(w http.ResponseWriter, r *http.Request) {
88 vars := mux.Vars(r)
89 did := vars["did"]
90
91 logger = logger.With("did", did)
92
93 conn, err := upgrader.Upgrade(w, r, nil)
94 if err != nil {
95 logger.Error("WebSocket upgrade failed", "error", err)
96 return
97 }
98 defer conn.Close()
99
100 logger.Info("new subscriber")
101
102 pdsURI, err := findUserPDS(r.Context(), did)
103 if err != nil {
104 logger.Error("cant resolve user pds", "error", err)
105 return
106 }
107 logger = logger.With("pds", pdsURI)
108
109 xrpcClient := &xrpc.Client{
110 Host: pdsURI,
111 }
112 // todo: implement skipping fetching follows and allow specifying users to listen to via websocket
113 follows, err := fetchFollows(r.Context(), xrpcClient, did)
114 if err != nil {
115 logger.Error("error fetching follows", "error", err)
116 return
117 }
118 logger.Info("fetched follows")
119 reposts, err := fetchReposts(r.Context(), xrpcClient, did)
120 if err != nil {
121 logger.Error("error fetching reposts", "error", err)
122 return
123 }
124 logger.Info("fetched reposts")
125
126 sd := &SubscriberData{
127 DID: did,
128 Conn: conn,
129 // use user follows as default listen to
130 ListenTo: follows,
131 Reposts: reposts,
132 }
133
134 subscribers.Set(sd.DID, sd)
135 for listenDid := range sd.ListenTo {
136 listenTo(sd, listenDid)
137 }
138
139 updateSubscriberStreamOpts()
140 updateLikeStreamOpts()
141 // delete subscriber after we are done
142 defer func() {
143 for listenDid := range sd.ListenTo {
144 stopListeningTo(sd.DID, listenDid)
145 }
146 subscribers.Del(sd.DID)
147
148 updateSubscriberStreamOpts()
149 updateLikeStreamOpts()
150 }()
151
152 logger.Info("serving subscriber")
153
154 for {
155 _, _, err := conn.ReadMessage()
156 if err != nil {
157 logger.Info("WebSocket connection closed", "error", err)
158 break
159 }
160 }
161}
162
163func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
164 return models.SubscriberOptionsUpdatePayload{
165 WantedCollections: []string{"app.bsky.feed.like"},
166 // WantedDIDs: getFollowsDids(),
167 }
168}
169
170func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload {
171 return models.SubscriberOptionsUpdatePayload{
172 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"},
173 WantedDIDs: getSubscriberDids(),
174 }
175}
176
177func updateLikeStreamOpts() {
178 opts := getLikeStreamOpts()
179 err := likeStream.SendOptionsUpdate(opts)
180 if err != nil {
181 logger.Error("couldnt update like stream opts", "error", err)
182 return
183 }
184 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs))
185}
186
187func updateSubscriberStreamOpts() {
188 opts := getSubscriberStreamOpts()
189 err := subscriberStream.SendOptionsUpdate(opts)
190 if err != nil {
191 logger.Error("couldnt update subscriber stream opts", "error", err)
192 return
193 }
194 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
195}
196
197func likeStreamLoop(logger *slog.Logger) {
198 startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
199}
200
201func subscriberStreamLoop(logger *slog.Logger) {
202 startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
203}
204
205func HandleLikeEvent(ctx context.Context, event *models.Event) error {
206 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 {
207 return nil
208 }
209
210 // skip handling event if its not from a source we are listening to
211 targets, exists := listeningTo.Get(event.Did)
212 if !exists {
213 return nil
214 }
215
216 var like bsky.FeedLike
217 if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
218 logger.Error("failed to unmarshal like", "error", err)
219 return nil
220 }
221
222 targets.Range(func(s string, sd *SubscriberData) bool {
223 for repostURI, _ := range sd.Reposts {
224 // (un)liked a post the subscriber reposted
225 if like.Subject.Uri == repostURI {
226 notification := NotificationMessage{
227 Liked: event.Commit.Operation != models.CommitOperationDelete,
228 ByDid: event.Did,
229 RepostURI: repostURI,
230 }
231
232 if err := sd.Conn.WriteJSON(notification); err != nil {
233 logger.Error("failed to send notification", "subscriber", sd.DID, "error", err)
234 }
235 }
236 }
237 return true
238 })
239
240 return nil
241}
242
243func HandleSubscriberEvent(ctx context.Context, event *models.Event) error {
244 if event == nil || event.Commit == nil {
245 return nil
246 }
247
248 switch event.Commit.Collection {
249 case "app.bsky.feed.repost":
250 modifySubscribersWithEvent(
251 event,
252 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) },
253 func(s *SubscriberData, r bsky.FeedRepost) {
254 s.Reposts[r.Subject.Uri] = struct{}{}
255 },
256 )
257 case "app.bsky.graph.follow":
258 modifySubscribersWithEvent(
259 event,
260 func(s *SubscriberData, r bsky.GraphFollow) {
261 delete(s.ListenTo, r.Subject)
262 stopListeningTo(s.DID, r.Subject)
263 },
264 func(s *SubscriberData, r bsky.GraphFollow) {
265 s.ListenTo[r.Subject] = struct{}{}
266 listenTo(s, r.Subject)
267 },
268 )
269 }
270
271 return nil
272}
273
274type ModifyFunc[v any] func(*SubscriberData, v)
275
276func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error {
277 if len(event.Commit.Record) == 0 {
278 return nil
279 }
280
281 var data v
282 if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
283 logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
284 return nil
285 }
286
287 if subscriber, exists := subscribers.Get(event.Did); exists {
288 if event.Commit.Operation == models.CommitOperationDelete {
289 onDelete(subscriber, data)
290 } else {
291 onUpdate(subscriber, data)
292 }
293 }
294
295 return nil
296}