its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "log"
7 "log/slog"
8 "net/http"
9
10 "github.com/bluesky-social/indigo/api/bsky"
11 "github.com/bluesky-social/indigo/xrpc"
12 "github.com/bluesky-social/jetstream/pkg/client"
13 "github.com/bluesky-social/jetstream/pkg/models"
14 "github.com/cornelk/hashmap"
15 "github.com/gorilla/mux"
16 "github.com/gorilla/websocket"
17)
18
19type Set[T comparable] map[T]struct{}
20
21// Data structures
22type SubscriberData struct {
23 DID string
24 Conn *websocket.Conn
25 ListenTo Set[string]
26 Reposts Set[string]
27}
28
29type NotificationMessage struct {
30 Liked bool `json:"liked"`
31 ByDid string `json:"did"`
32 RepostURI string `json:"repost_uri"`
33}
34
35// Global state
36var (
37 subscribers = hashmap.New[string, *SubscriberData]()
38
39 likeStream *client.Client
40 subscriberStream *client.Client
41
42 upgrader = websocket.Upgrader{
43 CheckOrigin: func(r *http.Request) bool {
44 return true
45 },
46 }
47
48 logger *slog.Logger
49)
50
51func getFollowsDids() []string {
52 var dids []string
53 subscribers.Range(func(s string, sd *SubscriberData) bool {
54 for follow, _ := range sd.ListenTo {
55 dids = append(dids, follow)
56 }
57 return true
58 })
59 return dids
60}
61
62func getSubscriberDids() []string {
63 dids := make([]string, 0, subscribers.Len())
64 subscribers.Range(func(s string, sd *SubscriberData) bool {
65 dids = append(dids, s)
66 return true
67 })
68 return dids
69}
70
71func main() {
72 logger = slog.Default()
73
74 go likeStreamLoop(logger)
75 go subscriberStreamLoop(logger)
76
77 r := mux.NewRouter()
78 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
79
80 log.Println("Server starting on :8080")
81 if err := http.ListenAndServe(":8080", r); err != nil {
82 log.Fatalf("error while serving: %s", err)
83 }
84}
85
86func handleSubscribe(w http.ResponseWriter, r *http.Request) {
87 vars := mux.Vars(r)
88 did := vars["did"]
89
90 logger = logger.With("did", did)
91
92 conn, err := upgrader.Upgrade(w, r, nil)
93 if err != nil {
94 logger.Error("WebSocket upgrade failed", "error", err)
95 return
96 }
97 defer conn.Close()
98
99 logger.Info("new subscriber")
100
101 pdsURI, err := findUserPDS(r.Context(), did)
102 if err != nil {
103 logger.Error("cant resolve user pds", "error", err)
104 return
105 }
106 logger = logger.With("pds", pdsURI)
107
108 xrpcClient := &xrpc.Client{
109 Host: pdsURI,
110 }
111 // todo: implement skipping fetching follows and allow specifying users to listen to via websocket
112 follows, err := fetchFollows(r.Context(), xrpcClient, did)
113 if err != nil {
114 logger.Error("error fetching follows", "error", err)
115 return
116 }
117 logger.Info("fetched follows")
118 reposts, err := fetchReposts(r.Context(), xrpcClient, did)
119 if err != nil {
120 logger.Error("error fetching reposts", "error", err)
121 return
122 }
123 logger.Info("fetched reposts")
124
125 subscriber := &SubscriberData{
126 DID: did,
127 Conn: conn,
128 // use user follows as default listen to
129 ListenTo: follows,
130 Reposts: reposts,
131 }
132
133 subscribers.Set(did, subscriber)
134 updateSubscriberStreamOpts()
135 updateLikeStreamOpts()
136 // delete subscriber after we are done
137 defer func() {
138 subscribers.Del(did)
139 updateSubscriberStreamOpts()
140 updateLikeStreamOpts()
141 }()
142
143 logger.Info("serving subscriber")
144
145 for {
146 _, _, err := conn.ReadMessage()
147 if err != nil {
148 logger.Info("WebSocket connection closed", "error", err)
149 break
150 }
151 }
152}
153
154func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
155 return models.SubscriberOptionsUpdatePayload{
156 WantedCollections: []string{"app.bsky.feed.like"},
157 WantedDIDs: getFollowsDids(),
158 }
159}
160
161func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload {
162 return models.SubscriberOptionsUpdatePayload{
163 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"},
164 WantedDIDs: getSubscriberDids(),
165 }
166}
167
168func updateLikeStreamOpts() {
169 opts := getLikeStreamOpts()
170 err := likeStream.SendOptionsUpdate(opts)
171 if err != nil {
172 logger.Error("couldnt update like stream opts", "error", err)
173 return
174 }
175 logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs))
176}
177
178func updateSubscriberStreamOpts() {
179 opts := getSubscriberStreamOpts()
180 err := subscriberStream.SendOptionsUpdate(opts)
181 if err != nil {
182 logger.Error("couldnt update subscriber stream opts", "error", err)
183 return
184 }
185 logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
186}
187
188func likeStreamLoop(logger *slog.Logger) {
189 startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
190}
191
192func subscriberStreamLoop(logger *slog.Logger) {
193 startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
194}
195
196func HandleLikeEvent(ctx context.Context, event *models.Event) error {
197 if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 {
198 return nil
199 }
200
201 var like bsky.FeedLike
202 if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
203 logger.Error("Failed to unmarshal like", "error", err)
204 return nil
205 }
206
207 subscribers.Range(func(s string, sd *SubscriberData) bool {
208 for repostURI, _ := range sd.Reposts {
209 // (un)liked a post the subscriber reposted
210 if like.Subject.Uri == repostURI {
211 notification := NotificationMessage{
212 Liked: event.Commit.Operation != models.CommitOperationDelete,
213 ByDid: event.Did,
214 RepostURI: repostURI,
215 }
216
217 if err := sd.Conn.WriteJSON(notification); err != nil {
218 logger.Error("Failed to send notification", "subscriber", sd.DID, "error", err)
219 }
220 }
221 }
222 return true
223 })
224
225 return nil
226}
227
228func HandleSubscriberEvent(ctx context.Context, event *models.Event) error {
229 if event == nil || event.Commit == nil {
230 return nil
231 }
232
233 switch event.Commit.Collection {
234 case "app.bsky.feed.repost":
235 modifySubscribersWithEvent(
236 event,
237 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) },
238 func(s *SubscriberData, r bsky.FeedRepost) {
239 s.Reposts[r.Subject.Uri] = struct{}{}
240 },
241 )
242 case "app.bsky.graph.follow":
243 modifySubscribersWithEvent(
244 event,
245 func(s *SubscriberData, r bsky.GraphFollow) { delete(s.ListenTo, r.Subject) },
246 func(s *SubscriberData, r bsky.GraphFollow) {
247 s.ListenTo[r.Subject] = struct{}{}
248 },
249 )
250 }
251
252 return nil
253}
254
255type ModifyFunc[v any] func(*SubscriberData, v)
256
257func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error {
258 if len(event.Commit.Record) == 0 {
259 return nil
260 }
261
262 var data v
263 if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
264 logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
265 return nil
266 }
267
268 if subscriber, exists := subscribers.Get(event.Did); exists {
269 if event.Commit.Operation == models.CommitOperationDelete {
270 onDelete(subscriber, data)
271 } else {
272 onUpdate(subscriber, data)
273 }
274 }
275
276 return nil
277}