package main import ( "context" "encoding/json" "log" "log/slog" "net/http" "sync/atomic" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/models" "github.com/cornelk/hashmap" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) type Set[T comparable] map[T]struct{} const ListenTypeNone = "none" const ListenTypeFollows = "follows" type SubscriberData struct { SubscribedTo syntax.DID Conn *websocket.Conn ListenType string ListenTo Set[syntax.DID] } type UserData struct { targets *hashmap.Map[string, *SubscriberData] likes map[syntax.RecordKey]bsky.FeedLike follows *hashmap.Map[syntax.RecordKey, bsky.GraphFollow] followsCursor atomic.Pointer[string] } type NotificationMessage struct { Liked bool `json:"liked"` ByDid syntax.DID `json:"did"` RepostURI syntax.ATURI `json:"repost_uri"` } type SubscriberMessage struct { Type string `json:"type"` Content json.RawMessage `json:"content"` } type SubscriberUpdateListenTo struct { ListenTo []syntax.DID `json:"listen_to"` } var ( // storing the subscriber data in both Should Be Fine // we dont modify subscriber data at the same time in two places subscribers = hashmap.New[string, *SubscriberData]() userData = hashmap.New[syntax.DID, *UserData]() likeStream *client.Client followStream *client.Client upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } logger *slog.Logger ) func getSubscriberDids() []string { _dids := make(Set[string], subscribers.Len()) subscribers.Range(func(s string, sd *SubscriberData) bool { _dids[string(sd.SubscribedTo)] = struct{}{} return true }) dids := make([]string, 0, len(_dids)) for k := range _dids { dids = append(dids, k) } return dids } func getUserData(did syntax.DID) *UserData { ud, _ := userData.GetOrInsert(did, &UserData{ targets: hashmap.New[string, *SubscriberData](), likes: make(map[syntax.RecordKey]bsky.FeedLike), follows: hashmap.New[syntax.RecordKey, bsky.GraphFollow](), }) return ud } func startListeningTo(sid string, sd *SubscriberData, did syntax.DID) { ud := getUserData(did) ud.targets.Insert(sid, sd) } func stopListeningTo(sid string, did syntax.DID) { if ud, exists := userData.Get(did); exists { ud.targets.Del(sid) } } func main() { logger = slog.Default() go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts) go startJetstreamLoop(logger, &followStream, "subscriber", HandleFollowEvent, getFollowStreamOpts) r := mux.NewRouter() r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") log.Println("server starting on :8080") if err := http.ListenAndServe(":8080", r); err != nil { log.Fatalf("error while serving: %s", err) } } func handleSubscribe(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) did, err := syntax.ParseDID(vars["did"]) if err != nil { http.Error(w, "not a valid did", http.StatusBadRequest) return } sid := uuid.New().String() query := r.URL.Query() listenType := query.Get("listenTo") if len(listenType) == 0 { listenType = ListenTypeFollows } logger := logger.With("did", did, "subscriberId", sid) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { logger.Error("WebSocket upgrade failed", "error", err) return } defer conn.Close() logger.Info("new subscriber") pdsURI, err := findUserPDS(r.Context(), did) if err != nil { logger.Error("cant resolve user pds", "error", err) return } logger = logger.With("pds", pdsURI) xrpcClient := &xrpc.Client{ Host: pdsURI, } ud := getUserData(did) sd := &SubscriberData{ SubscribedTo: did, Conn: conn, ListenType: listenType, } switch listenType { case ListenTypeFollows: follows, err := fetchFollows(r.Context(), xrpcClient, ud.followsCursor.Load(), did) if err != nil { logger.Error("error fetching follows", "error", err) return } sd.ListenTo = make(Set[syntax.DID]) if len(follows) > 0 { // store cursor for later requests so we dont have to fetch the whole thing again ud.followsCursor.Store((*string)(&follows[len(follows)-1].rkey)) for _, f := range follows { ud.follows.Insert(f.rkey, f.follow) sd.ListenTo[syntax.DID(f.follow.Subject)] = struct{}{} } } logger.Info("fetched follows") case ListenTypeNone: sd.ListenTo = make(Set[syntax.DID]) default: http.Error(w, "invalid listen type", http.StatusBadRequest) return } subscribers.Set(sid, sd) for listenDid := range sd.ListenTo { startListeningTo(sid, sd, listenDid) } updateFollowStreamOpts() // delete subscriber after we are done defer func() { for listenDid := range sd.ListenTo { stopListeningTo(sid, listenDid) } subscribers.Del(sid) updateFollowStreamOpts() }() logger.Info("serving subscriber") for { var msg SubscriberMessage err := conn.ReadJSON(&msg) if err != nil { logger.Info("WebSocket connection closed", "error", err) break } switch msg.Type { case "update_listen_to": // only allow this if we arent managing listen to if sd.ListenType != ListenTypeNone { continue } var innerMsg SubscriberUpdateListenTo if err := json.Unmarshal(msg.Content, &innerMsg); err != nil { logger.Info("invalid message", "error", err) break } // remove all current listens and add the ones the user requested for listenDid := range sd.ListenTo { stopListeningTo(sid, listenDid) delete(sd.ListenTo, listenDid) } for _, listenDid := range innerMsg.ListenTo { sd.ListenTo[listenDid] = struct{}{} startListeningTo(sid, sd, listenDid) } } } } func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.like"}, } } func getFollowStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.graph.follow"}, WantedDIDs: getSubscriberDids(), } } func updateFollowStreamOpts() { opts := getFollowStreamOpts() err := followStream.SendOptionsUpdate(opts) if err != nil { logger.Error("couldnt update follow stream opts", "error", err) return } logger.Info("updated follow stream opts", "userCount", len(opts.WantedDIDs)) } func HandleLikeEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } byDid := syntax.DID(event.Did) // skip handling event if its not from a source we are listening to ud, exists := userData.Get(byDid) if !exists || ud.targets.Len() == 0 { return nil } deleted := event.Commit.Operation == models.CommitOperationDelete rkey := syntax.RecordKey(event.Commit.RKey) var like bsky.FeedLike if deleted { if l, exists := ud.likes[rkey]; exists { like = l defer delete(ud.likes, rkey) } else { logger.Error("like record not found", "rkey", rkey) return nil } } else { if err := json.Unmarshal(event.Commit.Record, &like); err != nil { logger.Error("failed to unmarshal like", "error", err) return nil } } // if there is no via it means its not a repost anyway if like.Via == nil { return nil } // store for later when it gets deleted so we can fetch the record if !deleted { ud.likes[rkey] = like } repostURI := syntax.ATURI(like.Via.Uri) // if not a repost we dont care if repostURI.Collection() != "app.bsky.feed.repost" { return nil } reposterDID, err := repostURI.Authority().AsDID() if err != nil { return err } ud.targets.Range(func(sid string, sd *SubscriberData) bool { if sd.SubscribedTo != reposterDID { return true } notification := NotificationMessage{ Liked: !deleted, ByDid: byDid, RepostURI: repostURI, } if err := sd.Conn.WriteJSON(notification); err != nil { logger.Error("failed to send notification", "subscriber", sd.SubscribedTo, "error", err) } return true }) return nil } func HandleFollowEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } byDid := syntax.DID(event.Did) ud, exists := userData.Get(byDid) if !exists || ud.targets.Len() == 0 { return nil } deleted := event.Commit.Operation == models.CommitOperationDelete rkey := syntax.RecordKey(event.Commit.RKey) switch event.Commit.Collection { case "app.bsky.graph.follow": var r bsky.GraphFollow if deleted { if f, exists := ud.follows.Get(rkey); exists { r = f } else { logger.Error("follow record not found", "rkey", rkey) return nil } ud.follows.Del(rkey) } else { if err := unmarshalEvent(event, &r); err != nil { logger.Error("could not unmarshal follow event", "error", err) return nil } ud.follows.Insert(rkey, r) } ud.targets.Range(func(sid string, sd *SubscriberData) bool { // if we arent managing then we dont need to update anything if sd.ListenType != ListenTypeFollows { return true } subjectDid := syntax.DID(r.Subject) if deleted { stopListeningTo(sid, subjectDid) delete(sd.ListenTo, subjectDid) } else { sd.ListenTo[subjectDid] = struct{}{} startListeningTo(sid, sd, subjectDid) } return true }) } return nil } func unmarshalEvent[v any](event *models.Event, val *v) error { if err := json.Unmarshal(event.Commit.Record, val); err != nil { logger.Error("failed to unmarshal", "error", err, "raw", event.Commit.Record) return nil } return nil }