package main import ( "context" "encoding/json" "log" "log/slog" "net/http" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/models" "github.com/cornelk/hashmap" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) type Set[T comparable] map[T]struct{} const ListenTypeNone = "none" const ListenTypeFollows = "follows" type SubscriberData struct { DID syntax.DID Conn *websocket.Conn ListenType string ListenTo Set[syntax.DID] follows map[syntax.RecordKey]bsky.GraphFollow } type ListeneeData struct { targets *hashmap.Map[syntax.DID, *SubscriberData] likes map[syntax.RecordKey]bsky.FeedLike } type NotificationMessage struct { Liked bool `json:"liked"` ByDid syntax.DID `json:"did"` RepostURI syntax.ATURI `json:"repost_uri"` } type SubscriberMessage struct { Type string `json:"type"` Content json.RawMessage `json:"content"` } type SubscriberUpdateListenTo struct { ListenTo []syntax.DID `json:"listen_to"` } var ( // storing the subscriber data in both Should Be Fine // we dont modify subscriber data at the same time in two places subscribers = hashmap.New[syntax.DID, *SubscriberData]() listeningTo = hashmap.New[syntax.DID, *ListeneeData]() likeStream *client.Client subscriberStream *client.Client upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } logger *slog.Logger ) func getSubscriberDids() []string { dids := make([]string, 0, subscribers.Len()) subscribers.Range(func(s syntax.DID, sd *SubscriberData) bool { dids = append(dids, string(s)) return true }) return dids } func startListeningTo(sd *SubscriberData, did syntax.DID) { ld, _ := listeningTo.GetOrInsert(did, &ListeneeData{ targets: hashmap.New[syntax.DID, *SubscriberData](), likes: make(map[syntax.RecordKey]bsky.FeedLike), }) ld.targets.Insert(sd.DID, sd) } func stopListeningTo(subscriberDid, did syntax.DID) { if ld, exists := listeningTo.Get(did); exists { ld.targets.Del(subscriberDid) } } func main() { logger = slog.Default() go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts) go startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts) r := mux.NewRouter() r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") log.Println("server starting on :8080") if err := http.ListenAndServe(":8080", r); err != nil { log.Fatalf("error while serving: %s", err) } } func handleSubscribe(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) did, err := syntax.ParseDID(vars["did"]) if err != nil { http.Error(w, "not a valid did", http.StatusBadRequest) return } query := r.URL.Query() listenType := query.Get("listenTo") if len(listenType) == 0 { listenType = ListenTypeFollows } logger := logger.With("did", did) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { logger.Error("WebSocket upgrade failed", "error", err) return } defer conn.Close() logger.Info("new subscriber") pdsURI, err := findUserPDS(r.Context(), did) if err != nil { logger.Error("cant resolve user pds", "error", err) return } logger = logger.With("pds", pdsURI) xrpcClient := &xrpc.Client{ Host: pdsURI, } sd := &SubscriberData{ DID: did, Conn: conn, ListenType: listenType, } switch listenType { case ListenTypeFollows: follows, err := fetchFollows(r.Context(), xrpcClient, did) if err != nil { logger.Error("error fetching follows", "error", err) return } logger.Info("fetched follows") sd.follows = follows sd.ListenTo = make(Set[syntax.DID]) for _, follow := range follows { sd.ListenTo[syntax.DID(follow.Subject)] = struct{}{} } case ListenTypeNone: sd.ListenTo = make(Set[syntax.DID]) default: http.Error(w, "invalid listen type", http.StatusBadRequest) return } subscribers.Set(sd.DID, sd) for listenDid := range sd.ListenTo { startListeningTo(sd, listenDid) } updateSubscriberStreamOpts() // delete subscriber after we are done defer func() { for listenDid := range sd.ListenTo { stopListeningTo(sd.DID, listenDid) } subscribers.Del(sd.DID) updateSubscriberStreamOpts() }() logger.Info("serving subscriber") for { var msg SubscriberMessage err := conn.ReadJSON(&msg) if err != nil { logger.Info("WebSocket connection closed", "error", err) break } switch msg.Type { case "update_listen_to": // only allow this if we arent managing listen to if sd.ListenType != ListenTypeNone { continue } var innerMsg SubscriberUpdateListenTo if err := json.Unmarshal(msg.Content, &innerMsg); err != nil { logger.Info("invalid message", "error", err) break } // remove all current listens and add the ones the user requested for listenDid := range sd.ListenTo { stopListeningTo(sd.DID, listenDid) delete(sd.ListenTo, listenDid) } for _, listenDid := range innerMsg.ListenTo { sd.ListenTo[listenDid] = struct{}{} startListeningTo(sd, listenDid) } } } } func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.like"}, } } func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, WantedDIDs: getSubscriberDids(), } } func updateSubscriberStreamOpts() { opts := getSubscriberStreamOpts() err := subscriberStream.SendOptionsUpdate(opts) if err != nil { logger.Error("couldnt update subscriber stream opts", "error", err) return } logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs)) } func HandleLikeEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } byDid := syntax.DID(event.Did) // skip handling event if its not from a source we are listening to ld, exists := listeningTo.Get(byDid) if !exists { return nil } deleted := event.Commit.Operation == models.CommitOperationDelete rkey := syntax.RecordKey(event.Commit.RKey) var like bsky.FeedLike if deleted { if l, exists := ld.likes[rkey]; exists { like = l defer delete(ld.likes, rkey) } else { logger.Error("like record not found", "rkey", rkey) return nil } } else { if err := json.Unmarshal(event.Commit.Record, &like); err != nil { logger.Error("failed to unmarshal like", "error", err) return nil } } // if there is no via it means its not a repost anyway if like.Via == nil { return nil } // store for later when it gets deleted so we can fetch the record if !deleted { ld.likes[rkey] = like } repostURI := syntax.ATURI(like.Via.Uri) // if not a repost we dont care if repostURI.Collection() != "app.bsky.feed.repost" { return nil } reposterDID, err := repostURI.Authority().AsDID() if err != nil { return err } if sd, exists := ld.targets.Get(reposterDID); exists { notification := NotificationMessage{ Liked: !deleted, ByDid: byDid, RepostURI: repostURI, } if err := sd.Conn.WriteJSON(notification); err != nil { logger.Error("failed to send notification", "subscriber", sd.DID, "error", err) } } return nil } func HandleSubscriberEvent(ctx context.Context, event *models.Event) error { if event == nil || event.Commit == nil { return nil } byDid := syntax.DID(event.Did) sd, exists := subscribers.Get(byDid) if !exists { return nil } deleted := event.Commit.Operation == models.CommitOperationDelete rkey := syntax.RecordKey(event.Commit.RKey) switch event.Commit.Collection { case "app.bsky.graph.follow": // if we arent managing then we dont need to update anything if sd.ListenType != ListenTypeFollows { return nil } var r bsky.GraphFollow if deleted { if f, exists := sd.follows[rkey]; exists { r = f } else { logger.Error("follow record not found", "rkey", rkey) return nil } subjectDid := syntax.DID(r.Subject) stopListeningTo(sd.DID, subjectDid) delete(sd.ListenTo, subjectDid) delete(sd.follows, rkey) } else { if err := unmarshalEvent(event, &r); err != nil { return err } subjectDid := syntax.DID(r.Subject) sd.ListenTo[subjectDid] = struct{}{} sd.follows[rkey] = r startListeningTo(sd, subjectDid) } } return nil } func unmarshalEvent[v any](event *models.Event, val *v) error { if err := json.Unmarshal(event.Commit.Record, val); err != nil { logger.Error("failed to unmarshal", "error", err, "raw", event.Commit.Record) return nil } return nil }