its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "log/slog"
9 "net/http"
10 "sync"
11 "time"
12
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/api/bsky"
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/bluesky-social/jetstream/pkg/client"
17 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
18 "github.com/bluesky-social/jetstream/pkg/models"
19 "github.com/gorilla/mux"
20 "github.com/gorilla/websocket"
21)
22
23type Set[T comparable] map[T]struct{}
24
25// Data structures
26type SubscriberData struct {
27 DID string
28 Conn *websocket.Conn
29 Follows Set[string]
30 Reposts Set[string]
31}
32
33type NotificationMessage struct {
34 Liked bool `json:"liked"`
35 ByDid string `json:"did"`
36 RepostURI string `json:"repost_uri"`
37}
38
39// Global state
40var (
41 subscribers = make(map[string]*SubscriberData)
42 subscribersMux sync.RWMutex
43
44 likeStream *client.Client
45 subscriberStream *client.Client
46
47 xrpcClient *xrpc.Client
48
49 upgrader = websocket.Upgrader{
50 CheckOrigin: func(r *http.Request) bool {
51 return true
52 },
53 }
54
55 logger *slog.Logger
56)
57
58func main() {
59 logger = slog.Default()
60
61 xrpcClient = &xrpc.Client{
62 Client: &http.Client{
63 Timeout: 30 * time.Second,
64 },
65 Host: "https://bsky.social",
66 }
67
68 if err := initializeJetstreams(); err != nil {
69 log.Fatalf("cannot start jetstream: %s", err)
70 }
71
72 r := mux.NewRouter()
73 r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
74
75 log.Println("Server starting on :8080")
76 if err := http.ListenAndServe(":8080", r); err != nil {
77 log.Fatalf("error while serving: %s", err)
78 }
79}
80
81func handleSubscribe(w http.ResponseWriter, r *http.Request) {
82 vars := mux.Vars(r)
83 did := vars["did"]
84
85 conn, err := upgrader.Upgrade(w, r, nil)
86 if err != nil {
87 logger.Error("WebSocket upgrade failed", "error", err)
88 return
89 }
90 defer conn.Close()
91
92 logger.Info("New subscriber", "did", did)
93
94 follows, err := fetchFollows(r.Context(), did)
95 if err != nil {
96 logger.Error("Error fetching follows", "did", did, "error", err)
97 return
98 }
99
100 reposts, err := fetchReposts(r.Context(), did)
101 if err != nil {
102 logger.Error("Error fetching reposts", "did", did, "error", err)
103 return
104 }
105
106 // Store subscriber data
107 subscriber := &SubscriberData{
108 DID: did,
109 Conn: conn,
110 Follows: follows,
111 Reposts: reposts,
112 }
113
114 subscribersMux.Lock()
115 subscribers[did] = subscriber
116 subscribersMux.Unlock()
117 updateSubscriberStreamOpts()
118 // delete subscriber after we are done
119 defer func() {
120 subscribersMux.Lock()
121 delete(subscribers, did)
122 subscribersMux.Unlock()
123 updateSubscriberStreamOpts()
124 }()
125
126 for {
127 _, _, err := conn.ReadMessage()
128 if err != nil {
129 logger.Info("WebSocket connection closed", "did", did, "error", err)
130 break
131 }
132 }
133}
134
135func fetchReposts(ctx context.Context, did string) (Set[string], error) {
136 all := make(Set[string])
137 cursor := ""
138
139 for {
140 out, err := atproto.RepoListRecords(ctx, &xrpc.Client{}, "app.bsky.feed.repost", cursor, 100, did, false)
141 if err != nil {
142 return nil, err
143 }
144
145 for _, record := range out.Records {
146 all[record.Uri] = struct{}{}
147 }
148
149 if out.Cursor == nil || *out.Cursor == "" {
150 break
151 }
152 cursor = *out.Cursor
153 }
154
155 return all, nil
156}
157
158func fetchFollows(ctx context.Context, did string) (Set[string], error) {
159 all := make(Set[string])
160 cursor := ""
161
162 for {
163 out, err := bsky.GraphGetFollows(ctx, &xrpc.Client{}, did, cursor, 100)
164 if err != nil {
165 return nil, err
166 }
167
168 for _, record := range out.Follows {
169 all[record.Did] = struct{}{}
170 }
171
172 if out.Cursor == nil || *out.Cursor == "" {
173 break
174 }
175 cursor = *out.Cursor
176 }
177
178 return all, nil
179}
180
181func initializeJetstreams() error {
182 if err := startLikeClient(); err != nil {
183 return fmt.Errorf("like stream: %w", err)
184 }
185 if err := startSubscriberClient(); err != nil {
186 return fmt.Errorf("subscriber stream: %w", err)
187 }
188 return nil
189}
190
191func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
192 return models.SubscriberOptionsUpdatePayload{
193 WantedCollections: []string{"app.bsky.feed.like"},
194 WantedDIDs: getFollowsDids(),
195 }
196}
197
198func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload {
199 return models.SubscriberOptionsUpdatePayload{
200 WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"},
201 WantedDIDs: getSubscriberDids(),
202 }
203}
204
205func updateLikeStreamOpts() {
206 err := likeStream.SendOptionsUpdate(getLikeStreamOpts())
207 if err != nil {
208 // reinit like stream
209 }
210}
211
212func updateSubscriberStreamOpts() {
213 err := subscriberStream.SendOptionsUpdate(getSubscriberStreamOpts())
214 if err != nil {
215 // reinit subscriber stream
216 }
217}
218
219func startLikeClient() error {
220 opts := getLikeStreamOpts()
221 if len(opts.WantedDIDs) == 0 {
222 return nil // No follows to track
223 }
224
225 handler := &likeHandler{}
226 var err error
227 likeStream, err = startJetstreamClient("like_tracker", opts, handler.HandleEvent)
228 if err != nil {
229 return err
230 }
231
232 return nil
233}
234
235func startSubscriberClient() error {
236 opts := getSubscriberStreamOpts()
237 if len(opts.WantedDIDs) == 0 {
238 return nil // No subscribers to track
239 }
240
241 handler := &subscriberHandler{}
242 var err error
243 subscriberStream, err = startJetstreamClient("subscriber", opts, handler.HandleEvent)
244 if err != nil {
245 return err
246 }
247
248 return nil
249}
250
251func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent func(context.Context, *models.Event) error) (*client.Client, error) {
252 ctx := context.Background()
253
254 config := client.DefaultClientConfig()
255 config.WebsocketURL = "wss://jetstream.atproto.tools/subscribe"
256 config.Compress = true
257 config.WantedCollections = opts.WantedCollections
258 config.WantedDids = opts.WantedDIDs
259
260 scheduler := sequential.NewScheduler(name, logger, handleEvent)
261
262 c, err := client.NewClient(config, logger, scheduler)
263 if err != nil {
264 logger.Error("Failed to create client", "name", name, "error", err)
265 return nil, err
266 }
267
268 cursor := time.Now().UnixMicro()
269
270 logger.Info("Starting client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs))
271 if err := c.ConnectAndRead(ctx, &cursor); err != nil {
272 logger.Error("Client failed", "name", name, "error", err)
273 return nil, err
274 }
275
276 return c, nil
277}
278
279func getFollowsDids() []string {
280 subscribersMux.RLock()
281 defer subscribersMux.RUnlock()
282
283 var dids []string
284 for _, subscriber := range subscribers {
285 for follow, _ := range subscriber.Follows {
286 dids = append(dids, follow)
287 }
288 }
289
290 return dids
291}
292
293func getSubscriberDids() []string {
294 subscribersMux.RLock()
295 defer subscribersMux.RUnlock()
296
297 var dids []string
298 for did := range subscribers {
299 dids = append(dids, did)
300 }
301
302 return dids
303}
304
305type likeHandler struct{}
306
307func (h *likeHandler) HandleEvent(ctx context.Context, event *models.Event) error {
308 var like bsky.FeedLike
309 if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
310 logger.Error("Failed to unmarshal like", "error", err)
311 return nil
312 }
313
314 subscribersMux.RLock()
315 defer subscribersMux.RUnlock()
316
317 for _, subscriber := range subscribers {
318 for repostURI, _ := range subscriber.Reposts {
319 // (un)liked a post the subscriber reposted
320 if like.Subject.Uri == repostURI {
321 notification := NotificationMessage{
322 Liked: event.Commit.Operation != models.CommitOperationDelete,
323 ByDid: event.Did,
324 RepostURI: repostURI,
325 }
326
327 if err := subscriber.Conn.WriteJSON(notification); err != nil {
328 logger.Error("Failed to send notification", "subscriber", subscriber.DID, "error", err)
329 }
330 }
331 }
332 }
333
334 return nil
335}
336
337type subscriberHandler struct{}
338
339func (h *subscriberHandler) HandleEvent(ctx context.Context, event *models.Event) error {
340 switch event.Commit.Collection {
341 case "app.bsky.feed.repost":
342 modifySubscribersWithEvent(
343 event,
344 func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) },
345 func(s *SubscriberData, r bsky.FeedRepost) {
346 s.Reposts[r.Subject.Uri] = struct{}{}
347 },
348 )
349 case "app.bsky.graph.follow":
350 modifySubscribersWithEvent(
351 event,
352 func(s *SubscriberData, r bsky.GraphFollow) { delete(s.Follows, r.Subject) },
353 func(s *SubscriberData, r bsky.GraphFollow) {
354 s.Follows[r.Subject] = struct{}{}
355 },
356 )
357 updateLikeStreamOpts()
358 }
359
360 return nil
361}
362
363type ModifyFunc[v any] func(*SubscriberData, v)
364
365func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error {
366 var data v
367 if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
368 logger.Error("Failed to unmarshal repost", "error", err)
369 return nil
370 }
371
372 subscribersMux.Lock()
373 defer subscribersMux.Unlock()
374
375 if subscriber, exists := subscribers[event.Did]; exists {
376 if event.Commit.Operation == models.CommitOperationDelete {
377 onDelete(subscriber, data)
378 } else {
379 onUpdate(subscriber, data)
380 }
381 }
382
383 return nil
384}