package main import ( "context" "encoding/json" "errors" "io" "log" "log/slog" "net/http" "sync/atomic" "time" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/models" "github.com/cornelk/hashmap" "github.com/google/uuid" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) type Set[T comparable] map[T]struct{} const ListenTypeNone = "none" const ListenTypeFollows = "follows" type SubscriberData struct { forActor syntax.DID conn *websocket.Conn listenType string listenTo Set[syntax.DID] } type ActorData struct { targets *hashmap.Map[string, *SubscriberData] likes *hashmap.Map[syntax.RecordKey, bsky.FeedLike] follows *hashmap.Map[syntax.RecordKey, bsky.GraphFollow] followsCursor atomic.Pointer[string] profile *bsky.ActorDefs_ProfileViewDetailed profileFetchedAt time.Time } type NotificationActor struct { DID syntax.DID `json:"did"` // the DID of the actor that (un)liked the post Profile *bsky.ActorDefs_ProfileViewDetailed `json:"profile"` // the detailed profile of the actor that (un)liked the post } type NotificationMessage struct { Liked bool `json:"liked"` // whether the message was liked or unliked Actor NotificationActor `json:"actor"` // information about the actor that (un)liked Record bsky.FeedLike `json:"record"` // the raw like record Time int64 `json:"time"` // when the like event came in } type SubscriberMessage struct { Type string `json:"type"` Content json.RawMessage `json:"content"` } type SubscriberUpdateListenTo struct { ListenTo []syntax.DID `json:"listen_to"` } var ( // storing the subscriber data in both Should Be Fine // we dont modify subscriber data at the same time in two places subscribers = hashmap.New[string, *SubscriberData]() actorData = hashmap.New[syntax.DID, *ActorData]() likeStreams *StreamManager followStreams *StreamManager upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } logger *slog.Logger ) func getSubscriberDids() []string { _dids := make(Set[string], subscribers.Len()) subscribers.Range(func(s string, sd *SubscriberData) bool { _dids[string(sd.forActor)] = struct{}{} return true }) dids := make([]string, 0, len(_dids)) for k := range _dids { dids = append(dids, k) } return dids } func getLikeDids() []string { _dids := make(Set[string], subscribers.Len()*5000) subscribers.Range(func(s string, sd *SubscriberData) bool { for did := range sd.listenTo { _dids[string(did)] = struct{}{} } return true }) dids := make([]string, 0, len(_dids)) for k := range _dids { dids = append(dids, k) } return dids } func getActorData(did syntax.DID) *ActorData { ud, _ := actorData.GetOrInsert(did, &ActorData{ targets: hashmap.New[string, *SubscriberData](), likes: hashmap.New[syntax.RecordKey, bsky.FeedLike](), follows: hashmap.New[syntax.RecordKey, bsky.GraphFollow](), }) return ud } func markActorForLikes(sid string, sd *SubscriberData, did syntax.DID) { ud := getActorData(did) ud.targets.Insert(sid, sd) } func unmarkActorForLikes(sid string, did syntax.DID) { if ud, exists := actorData.Get(did); exists { ud.targets.Del(sid) } } func main() { logger = slog.Default() likeStreams = NewStreamManager(logger, "like-tracker", HandleLikeEvent, getLikeStreamOpts) followStreams = NewStreamManager(logger, "subscriber", HandleFollowEvent, getFollowStreamOpts) r := mux.NewRouter() r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") log.Println("server starting on :8080") if err := http.ListenAndServe(":8080", r); err != nil { log.Fatalf("error while serving: %s", err) } } func handleSubscribe(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) did, err := syntax.ParseDID(vars["did"]) if err != nil { http.Error(w, "not a valid did", http.StatusBadRequest) return } sid := uuid.New().String() query := r.URL.Query() listenType := query.Get("listenTo") if len(listenType) == 0 { listenType = ListenTypeFollows } logger := logger.With("did", did, "subscriberId", sid) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { logger.Error("WebSocket upgrade failed", "error", err) return } defer conn.Close() logger.Info("new subscriber") pdsURI, err := findUserPDS(r.Context(), did) if err != nil { logger.Error("cant resolve user pds", "error", err) return } logger = logger.With("pds", pdsURI) xrpcClient := &xrpc.Client{ Host: pdsURI, } ud := getActorData(did) sd := &SubscriberData{ forActor: did, conn: conn, listenType: listenType, } switch listenType { case ListenTypeFollows: follows, err := fetchFollows(r.Context(), xrpcClient, ud.followsCursor.Load(), did) if err != nil { logger.Error("error fetching follows", "error", err) return } sd.listenTo = make(Set[syntax.DID]) // use we have stored ud.follows.Range(func(rk syntax.RecordKey, f bsky.GraphFollow) bool { sd.listenTo[syntax.DID(f.Subject)] = struct{}{} return true }) if len(follows) > 0 { // store cursor for later requests so we dont have to fetch the whole thing again ud.followsCursor.Store((*string)(&follows[len(follows)-1].rkey)) for _, f := range follows { ud.follows.Insert(f.rkey, f.follow) sd.listenTo[syntax.DID(f.follow.Subject)] = struct{}{} } } logger.Info("fetched follows") case ListenTypeNone: sd.listenTo = make(Set[syntax.DID]) default: http.Error(w, "invalid listen type", http.StatusBadRequest) return } subscribers.Set(sid, sd) for listenDid := range sd.listenTo { markActorForLikes(sid, sd, listenDid) } updateStreamOpts() // delete subscriber after we are done defer func() { for listenDid := range sd.listenTo { unmarkActorForLikes(sid, listenDid) } subscribers.Del(sid) updateStreamOpts() }() logger.Info("serving subscriber") // send pings go func() { for { select { case <-r.Context().Done(): return default: conn.WriteMessage(websocket.PingMessage, []byte{}) time.Sleep(time.Second * 15) } } }() for { var msg SubscriberMessage err := conn.ReadJSON(&msg) // dont kill websocket if its no value if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { logger.Error("WebSocket connection closed", "error", err) break } switch msg.Type { case "update_listen_to": // only allow this if we arent managing listen to if sd.listenType != ListenTypeNone { continue } var innerMsg SubscriberUpdateListenTo if err := json.Unmarshal(msg.Content, &innerMsg); err != nil { logger.Debug("invalid message", "error", err) break } // remove all current listens and add the ones the user requested for listenDid := range sd.listenTo { unmarkActorForLikes(sid, listenDid) delete(sd.listenTo, listenDid) } for _, listenDid := range innerMsg.ListenTo { sd.listenTo[listenDid] = struct{}{} markActorForLikes(sid, sd, listenDid) } updateStreamOpts() } } } func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.like"}, WantedDIDs: getLikeDids(), } } func getFollowStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.graph.follow"}, WantedDIDs: getSubscriberDids(), } } func updateStreamOpts() { likeStreams.updateOpts() followStreams.updateOpts() } func HandleLikeEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } byDid := syntax.DID(event.Did) // skip handling event if its not from a source we are listening to ud, exists := actorData.Get(byDid) if !exists || ud.targets.Len() == 0 { return nil } logger := logger.With("actor", byDid, "type", "like") deleted := event.Commit.Operation == models.CommitOperationDelete rkey := syntax.RecordKey(event.Commit.RKey) var like bsky.FeedLike if deleted { if l, exists := ud.likes.Get(rkey); exists { like = l defer ud.likes.Del(rkey) } else { logger.Error("like record not found", "rkey", rkey) return nil } } else if err := unmarshalEvent(event, &like); err != nil { return nil } // if there is no via it means its not a repost anyway if like.Via == nil { return nil } // store for later when it gets deleted so we can fetch the record if !deleted { ud.likes.Insert(rkey, like) } repostURI := syntax.ATURI(like.Via.Uri) // if not a repost we dont care if repostURI.Collection() != "app.bsky.feed.repost" { return nil } reposterDID, err := repostURI.Authority().AsDID() if err != nil { return err } ud.targets.Range(func(sid string, sd *SubscriberData) bool { if sd.forActor != reposterDID { return true } if ud.profile == nil || time.Since(ud.profileFetchedAt) > time.Hour*24 { profile, err := fetchProfile(ctx, byDid) if err != nil { logger.Error("cant fetch profile", "error", err) } else { ud.profile = profile ud.profileFetchedAt = time.Now() } } notification := NotificationMessage{ Liked: !deleted, Actor: NotificationActor{ DID: byDid, Profile: ud.profile, }, Record: like, Time: event.TimeUS, } if err := sd.conn.WriteJSON(notification); err != nil { logger.Error("failed to send notification", "error", err) } return true }) return nil } func HandleFollowEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } byDid := syntax.DID(event.Did) ud, exists := actorData.Get(byDid) if !exists || ud.targets.Len() == 0 { return nil } deleted := event.Commit.Operation == models.CommitOperationDelete rkey := syntax.RecordKey(event.Commit.RKey) switch event.Commit.Collection { case "app.bsky.graph.follow": var r bsky.GraphFollow if deleted { if f, exists := ud.follows.Get(rkey); exists { r = f } else { // most likely no ListenTypeFollows subscriber attached on actor logger.Warn("follow record not found", "rkey", rkey, "actor", byDid) return nil } ud.follows.Del(rkey) } else { if err := unmarshalEvent(event, &r); err != nil { return nil } ud.follows.Insert(rkey, r) } ud.targets.Range(func(sid string, sd *SubscriberData) bool { // if we arent managing then we dont need to update anything if sd.listenType != ListenTypeFollows { return true } subjectDid := syntax.DID(r.Subject) if deleted { unmarkActorForLikes(sid, subjectDid) delete(sd.listenTo, subjectDid) } else { sd.listenTo[subjectDid] = struct{}{} markActorForLikes(sid, sd, subjectDid) } return true }) } return nil } func unmarshalEvent[v any](event *models.Event, val *v) error { if err := json.Unmarshal(event.Commit.Record, val); err != nil { logger.Error("cant unmarshal record", "error", err, "raw", event.Commit.Record) return err } return nil }