package main import ( "context" "encoding/json" "log" "log/slog" "net/http" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/models" "github.com/cornelk/hashmap" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) type Set[T comparable] map[T]struct{} const ListenTypeNone = "none" const ListenTypeFollows = "follows" type SubscriberData struct { DID string Conn *websocket.Conn ListenType string ListenTo Set[string] Reposts Set[string] } type NotificationMessage struct { Liked bool `json:"liked"` ByDid string `json:"did"` RepostURI string `json:"repost_uri"` } type SubscriberMessage struct { Type string `json:"type"` Content json.RawMessage `json:"content"` } type SubscriberUpdateListenTo struct { ListenTo []string `json:"listen_to"` } var ( // storing the subscriber data in both Should Be Fine // we dont modify subscriber data at the same time in two places subscribers = hashmap.New[string, *SubscriberData]() listeningTo = hashmap.New[string, *hashmap.Map[string, *SubscriberData]]() likeStream *client.Client subscriberStream *client.Client upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } logger *slog.Logger ) func getSubscriberDids() []string { dids := make([]string, 0, subscribers.Len()) subscribers.Range(func(s string, sd *SubscriberData) bool { dids = append(dids, s) return true }) return dids } func listenTo(sd *SubscriberData, did string) { targetDids, _ := listeningTo.GetOrInsert(did, hashmap.New[string, *SubscriberData]()) targetDids.Insert(sd.DID, sd) } func stopListeningTo(subscriberDid, did string) { if targetDids, exists := listeningTo.Get(did); exists { targetDids.Del(subscriberDid) } } func main() { logger = slog.Default() go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts) go startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts) r := mux.NewRouter() r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") log.Println("server starting on :8080") if err := http.ListenAndServe(":8080", r); err != nil { log.Fatalf("error while serving: %s", err) } } func handleSubscribe(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) did := vars["did"] query := r.URL.Query() // "follows", everything else is considered as "none" listenType := query.Get("listenTo") if len(listenType) == 0 { listenType = ListenTypeFollows } logger = logger.With("did", did) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { logger.Error("WebSocket upgrade failed", "error", err) return } defer conn.Close() logger.Info("new subscriber") pdsURI, err := findUserPDS(r.Context(), did) if err != nil { logger.Error("cant resolve user pds", "error", err) return } logger = logger.With("pds", pdsURI) xrpcClient := &xrpc.Client{ Host: pdsURI, } var subbedTo Set[string] switch listenType { case ListenTypeFollows: follows, err := fetchFollows(r.Context(), xrpcClient, did) if err != nil { logger.Error("error fetching follows", "error", err) return } logger.Info("fetched follows") subbedTo = follows case ListenTypeNone: subbedTo = make(Set[string]) default: logger.Error("invalid listen type", "requestedType", listenType) return } reposts, err := fetchReposts(r.Context(), xrpcClient, did) if err != nil { logger.Error("error fetching reposts", "error", err) return } logger.Info("fetched reposts") sd := &SubscriberData{ DID: did, Conn: conn, ListenType: listenType, ListenTo: subbedTo, Reposts: reposts, } subscribers.Set(sd.DID, sd) for listenDid := range sd.ListenTo { listenTo(sd, listenDid) } updateSubscriberStreamOpts() updateLikeStreamOpts() // delete subscriber after we are done defer func() { for listenDid := range sd.ListenTo { stopListeningTo(sd.DID, listenDid) } subscribers.Del(sd.DID) updateSubscriberStreamOpts() updateLikeStreamOpts() }() logger.Info("serving subscriber") for { var msg SubscriberMessage err := conn.ReadJSON(&msg) if err != nil { logger.Info("WebSocket connection closed", "error", err) break } switch msg.Type { case "update_listen_to": // only allow this if we arent managing listen to if sd.ListenType != ListenTypeNone { continue } var innerMsg SubscriberUpdateListenTo if err := json.Unmarshal(msg.Content, &innerMsg); err != nil { logger.Info("invalid message", "error", err) break } // remove all current listens and add the ones the user requested for listenDid := range sd.ListenTo { stopListeningTo(sd.DID, listenDid) delete(sd.ListenTo, listenDid) } for _, listenDid := range innerMsg.ListenTo { sd.ListenTo[listenDid] = struct{}{} listenTo(sd, listenDid) } } } } func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.like"}, // WantedDIDs: getFollowsDids(), } } func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, WantedDIDs: getSubscriberDids(), } } func updateLikeStreamOpts() { opts := getLikeStreamOpts() err := likeStream.SendOptionsUpdate(opts) if err != nil { logger.Error("couldnt update like stream opts", "error", err) return } logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs)) } func updateSubscriberStreamOpts() { opts := getSubscriberStreamOpts() err := subscriberStream.SendOptionsUpdate(opts) if err != nil { logger.Error("couldnt update subscriber stream opts", "error", err) return } logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs)) } func HandleLikeEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 { return nil } // skip handling event if its not from a source we are listening to targets, exists := listeningTo.Get(event.Did) if !exists { return nil } var like bsky.FeedLike if err := json.Unmarshal(event.Commit.Record, &like); err != nil { logger.Error("failed to unmarshal like", "error", err) return nil } targets.Range(func(s string, sd *SubscriberData) bool { for repostURI, _ := range sd.Reposts { // (un)liked a post the subscriber reposted if like.Subject.Uri == repostURI { notification := NotificationMessage{ Liked: event.Commit.Operation != models.CommitOperationDelete, ByDid: event.Did, RepostURI: repostURI, } if err := sd.Conn.WriteJSON(notification); err != nil { logger.Error("failed to send notification", "subscriber", sd.DID, "error", err) } } } return true }) return nil } func HandleSubscriberEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } switch event.Commit.Collection { case "app.bsky.feed.repost": modifySubscribersWithEvent( event, func(s *SubscriberData, r bsky.FeedRepost, deleted bool) { if deleted { delete(s.Reposts, r.Subject.Uri) } else { s.Reposts[r.Subject.Uri] = struct{}{} } }, ) case "app.bsky.graph.follow": modifySubscribersWithEvent( event, func(s *SubscriberData, r bsky.GraphFollow, deleted bool) { // if we arent managing then we dont need to update anything if s.ListenType != ListenTypeFollows { return } if deleted { stopListeningTo(s.DID, r.Subject) delete(s.ListenTo, r.Subject) } else { s.ListenTo[r.Subject] = struct{}{} listenTo(s, r.Subject) } }, ) } return nil } type ModifyFunc[v any] func(*SubscriberData, v, bool) func modifySubscribersWithEvent[v any](event *models.Event, handle ModifyFunc[v]) error { if len(event.Commit.Record) == 0 { return nil } var data v if err := json.Unmarshal(event.Commit.Record, &data); err != nil { logger.Error("failed to unmarshal repost", "error", err, "raw", event.Commit.Record) return nil } if subscriber, exists := subscribers.Get(event.Did); exists { handle(subscriber, data, event.Commit.Operation == models.CommitOperationDelete) } return nil }