its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "log/slog"
8 "net/http"
9 "sync"
10
11 "github.com/bluesky-social/indigo/api/bsky"
12 "github.com/bluesky-social/indigo/xrpc"
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/gorilla/mux"
16 "github.com/gorilla/websocket"
17)
18
19type Set[T comparable] map[T]struct{}
20
21// Data structures
22type SubscriberData struct {
23 DID string
24 Conn *websocket.Conn
25 ListenTo Set[string]
26 Reposts Set[string]
27}
28
29type NotificationMessage struct {
30 Liked bool `json:"liked"`
31 ByDid string `json:"did"`
32 RepostURI string `json:"repost_uri"`
33}
34
35// Global state
36var (
37 subscribers = make(map[string]*SubscriberData)
38 subscribersMux sync.RWMutex
39
40 likeStream *client.Client
41 subscriberStream *client.Client
42
43 upgrader = websocket.Upgrader{
44 CheckOrigin: func(r *http.Request) bool {
45 return true
46 },
47 }
48
49 logger *slog.Logger
50)
51
52func getFollowsDids() []string {
53 subscribersMux.RLock()
54 defer subscribersMux.RUnlock()
55
56 var dids []string
57 for _, subscriber := range subscribers {
58 for follow, _ := range subscriber.ListenTo {
59 dids = append(dids, follow)
60 }
61 }
62
63 return dids
64}
65
66func getSubscriberDids() []string {
67 subscribersMux.RLock()
68 defer subscribersMux.RUnlock()
69
70 var dids []string
71 for did := range subscribers {
72 dids = append(dids, did)
73 }
74
75 return dids
76}
77
78func main() {
79 logger = slog.Default()
80
81 go likeStreamLoop(logger)
82 go subscriberStreamLoop(logger)
83
84 r := mux.NewRouter()
85 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
86
87 log.Println("Server starting on :8080")
88 if err := http.ListenAndServe(":8080", r); err != nil {
89 log.Fatalf("error while serving: %s", err)
90 }
91}
92
93func handleSubscribe(w http.ResponseWriter, r *http.Request) {
94 vars := mux.Vars(r)
95 did := vars["did"]
96
97 logger = logger.With("did", did)
98
99 conn, err := upgrader.Upgrade(w, r, nil)
100 if err != nil {
101 logger.Error("WebSocket upgrade failed", "error", err)
102 return
103 }
104 defer conn.Close()
105
106 logger.Info("new subscriber")
107
108 pdsURI, err := findUserPDS(r.Context(), did)
109 if err != nil {
110 logger.Error("cant resolve user pds", "error", err)
111 return
112 }
113 logger = logger.With("pds", pdsURI)
114
115 xrpcClient := &xrpc.Client{
116 Host: pdsURI,
117 }
118 // todo: implement skipping fetching follows and allow specifying users to listen to via websocket
119 follows, err := fetchFollows(r.Context(), xrpcClient, did)
120 if err != nil {
121 logger.Error("error fetching follows", "error", err)
122 return
123 }
124 logger.Info("fetched follows")
125 reposts, err := fetchReposts(r.Context(), xrpcClient, did)
126 if err != nil {
127 logger.Error("error fetching reposts", "error", err)
128 return
129 }
130 logger.Info("fetched reposts")
131
132 subscriber := &SubscriberData{
133 DID: did,
134 Conn: conn,
135 // use user follows as default listen to
136 ListenTo: follows,
137 Reposts: reposts,
138 }
139
140 subscribersMux.Lock()
141 subscribers[did] = subscriber
142 subscribersMux.Unlock()
143 updateSubscriberStreamOpts()
144 updateLikeStreamOpts()
145 // delete subscriber after we are done
146 defer func() {
147 subscribersMux.Lock()
148 delete(subscribers, did)
149 subscribersMux.Unlock()
150 updateSubscriberStreamOpts()
151 updateLikeStreamOpts()
152 }()
153
154 logger.Info("serving subscriber")
155
156 for {
157 _, _, err := conn.ReadMessage()
158 if err != nil {
159 logger.Info("WebSocket connection closed", "error", err)
160 break
161 }
162 }
163}
164
165func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
166 return models.SubscriberOptionsUpdatePayload{
167 WantedCollections: []string{"app.bsky.feed.like"},
168 WantedDIDs: getFollowsDids(),
169 }
170}
171
172func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload {
173 return models.SubscriberOptionsUpdatePayload{
174 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"},
175 WantedDIDs: getSubscriberDids(),
176 }
177}
178
179func updateLikeStreamOpts() {
180 opts := getLikeStreamOpts()
181 err := likeStream.SendOptionsUpdate(opts)
182 if err != nil {
183 logger.Error("couldnt update like stream opts", "error", err)
184 return
185 }
186 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs))
187}
188
189func updateSubscriberStreamOpts() {
190 opts := getSubscriberStreamOpts()
191 err := subscriberStream.SendOptionsUpdate(opts)
192 if err != nil {
193 logger.Error("couldnt update subscriber stream opts", "error", err)
194 return
195 }
196 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
197}
198
199func likeStreamLoop(logger *slog.Logger) {
200 startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
201}
202
203func subscriberStreamLoop(logger *slog.Logger) {
204 startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
205}
206
207func HandleLikeEvent(ctx context.Context, event *models.Event) error {
208 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 {
209 return nil
210 }
211
212 var like bsky.FeedLike
213 if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
214 logger.Error("Failed to unmarshal like", "error", err)
215 return nil
216 }
217
218 subscribersMux.RLock()
219 defer subscribersMux.RUnlock()
220
221 for _, subscriber := range subscribers {
222 for repostURI, _ := range subscriber.Reposts {
223 // (un)liked a post the subscriber reposted
224 if like.Subject.Uri == repostURI {
225 notification := NotificationMessage{
226 Liked: event.Commit.Operation != models.CommitOperationDelete,
227 ByDid: event.Did,
228 RepostURI: repostURI,
229 }
230
231 if err := subscriber.Conn.WriteJSON(notification); err != nil {
232 logger.Error("Failed to send notification", "subscriber", subscriber.DID, "error", err)
233 }
234 }
235 }
236 }
237
238 return nil
239}
240
241func HandleSubscriberEvent(ctx context.Context, event *models.Event) error {
242 if event == nil || event.Commit == nil {
243 return nil
244 }
245
246 switch event.Commit.Collection {
247 case "app.bsky.feed.repost":
248 modifySubscribersWithEvent(
249 event,
250 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) },
251 func(s *SubscriberData, r bsky.FeedRepost) {
252 s.Reposts[r.Subject.Uri] = struct{}{}
253 },
254 )
255 case "app.bsky.graph.follow":
256 modifySubscribersWithEvent(
257 event,
258 func(s *SubscriberData, r bsky.GraphFollow) { delete(s.ListenTo, r.Subject) },
259 func(s *SubscriberData, r bsky.GraphFollow) {
260 s.ListenTo[r.Subject] = struct{}{}
261 },
262 )
263 }
264
265 return nil
266}
267
268type ModifyFunc[v any] func(*SubscriberData, v)
269
270func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error {
271 if len(event.Commit.Record) == 0 {
272 return nil
273 }
274
275 var data v
276 if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
277 logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
278 return nil
279 }
280
281 subscribersMux.Lock()
282 defer subscribersMux.Unlock()
283
284 if subscriber, exists := subscribers[event.Did]; exists {
285 if event.Commit.Operation == models.CommitOperationDelete {
286 onDelete(subscriber, data)
287 } else {
288 onUpdate(subscriber, data)
289 }
290 }
291
292 return nil
293}