package main import ( "context" "encoding/json" "fmt" "log" "log/slog" "net/http" "sync" "time" "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/indigo/xrpc" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) type Set[T comparable] map[T]struct{} // Data structures type SubscriberData struct { DID string Conn *websocket.Conn Follows Set[string] Reposts Set[string] } type NotificationMessage struct { Liked bool `json:"liked"` ByDid string `json:"did"` RepostURI string `json:"repost_uri"` } // Global state var ( subscribers = make(map[string]*SubscriberData) subscribersMux sync.RWMutex likeStream *client.Client subscriberStream *client.Client xrpcClient *xrpc.Client upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } logger *slog.Logger ) func main() { logger = slog.Default() xrpcClient = &xrpc.Client{ Client: &http.Client{ Timeout: 30 * time.Second, }, Host: "https://bsky.social", } if err := initializeJetstreams(); err != nil { log.Fatalf("cannot start jetstream: %s", err) } r := mux.NewRouter() r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET") log.Println("Server starting on :8080") if err := http.ListenAndServe(":8080", r); err != nil { log.Fatalf("error while serving: %s", err) } } func handleSubscribe(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) did := vars["did"] conn, err := upgrader.Upgrade(w, r, nil) if err != nil { logger.Error("WebSocket upgrade failed", "error", err) return } defer conn.Close() logger.Info("New subscriber", "did", did) follows, err := fetchFollows(r.Context(), did) if err != nil { logger.Error("Error fetching follows", "did", did, "error", err) return } reposts, err := fetchReposts(r.Context(), did) if err != nil { logger.Error("Error fetching reposts", "did", did, "error", err) return } // Store subscriber data subscriber := &SubscriberData{ DID: did, Conn: conn, Follows: follows, Reposts: reposts, } subscribersMux.Lock() subscribers[did] = subscriber subscribersMux.Unlock() updateSubscriberStreamOpts() // delete subscriber after we are done defer func() { subscribersMux.Lock() delete(subscribers, did) subscribersMux.Unlock() updateSubscriberStreamOpts() }() for { _, _, err := conn.ReadMessage() if err != nil { logger.Info("WebSocket connection closed", "did", did, "error", err) break } } } func fetchReposts(ctx context.Context, did string) (Set[string], error) { all := make(Set[string]) cursor := "" for { out, err := atproto.RepoListRecords(ctx, &xrpc.Client{}, "app.bsky.feed.repost", cursor, 100, did, false) if err != nil { return nil, err } for _, record := range out.Records { all[record.Uri] = struct{}{} } if out.Cursor == nil || *out.Cursor == "" { break } cursor = *out.Cursor } return all, nil } func fetchFollows(ctx context.Context, did string) (Set[string], error) { all := make(Set[string]) cursor := "" for { out, err := bsky.GraphGetFollows(ctx, &xrpc.Client{}, did, cursor, 100) if err != nil { return nil, err } for _, record := range out.Follows { all[record.Did] = struct{}{} } if out.Cursor == nil || *out.Cursor == "" { break } cursor = *out.Cursor } return all, nil } func initializeJetstreams() error { if err := startLikeClient(); err != nil { return fmt.Errorf("like stream: %w", err) } if err := startSubscriberClient(); err != nil { return fmt.Errorf("subscriber stream: %w", err) } return nil } func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.like"}, WantedDIDs: getFollowsDids(), } } func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload { return models.SubscriberOptionsUpdatePayload{ WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"}, WantedDIDs: getSubscriberDids(), } } func updateLikeStreamOpts() { err := likeStream.SendOptionsUpdate(getLikeStreamOpts()) if err != nil { // reinit like stream } } func updateSubscriberStreamOpts() { err := subscriberStream.SendOptionsUpdate(getSubscriberStreamOpts()) if err != nil { // reinit subscriber stream } } func startLikeClient() error { opts := getLikeStreamOpts() if len(opts.WantedDIDs) == 0 { return nil // No follows to track } handler := &likeHandler{} var err error likeStream, err = startJetstreamClient("like_tracker", opts, handler.HandleEvent) if err != nil { return err } return nil } func startSubscriberClient() error { opts := getSubscriberStreamOpts() if len(opts.WantedDIDs) == 0 { return nil // No subscribers to track } handler := &subscriberHandler{} var err error subscriberStream, err = startJetstreamClient("subscriber", opts, handler.HandleEvent) if err != nil { return err } return nil } func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent func(context.Context, *models.Event) error) (*client.Client, error) { ctx := context.Background() config := client.DefaultClientConfig() config.WebsocketURL = "wss://jetstream.atproto.tools/subscribe" config.Compress = true config.WantedCollections = opts.WantedCollections config.WantedDids = opts.WantedDIDs scheduler := sequential.NewScheduler(name, logger, handleEvent) c, err := client.NewClient(config, logger, scheduler) if err != nil { logger.Error("Failed to create client", "name", name, "error", err) return nil, err } cursor := time.Now().UnixMicro() logger.Info("Starting client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs)) if err := c.ConnectAndRead(ctx, &cursor); err != nil { logger.Error("Client failed", "name", name, "error", err) return nil, err } return c, nil } func getFollowsDids() []string { subscribersMux.RLock() defer subscribersMux.RUnlock() var dids []string for _, subscriber := range subscribers { for follow, _ := range subscriber.Follows { dids = append(dids, follow) } } return dids } func getSubscriberDids() []string { subscribersMux.RLock() defer subscribersMux.RUnlock() var dids []string for did := range subscribers { dids = append(dids, did) } return dids } type likeHandler struct{} func (h *likeHandler) HandleEvent(ctx context.Context, event *models.Event) error { var like bsky.FeedLike if err := json.Unmarshal(event.Commit.Record, &like); err != nil { logger.Error("Failed to unmarshal like", "error", err) return nil } subscribersMux.RLock() defer subscribersMux.RUnlock() for _, subscriber := range subscribers { for repostURI, _ := range subscriber.Reposts { // (un)liked a post the subscriber reposted if like.Subject.Uri == repostURI { notification := NotificationMessage{ Liked: event.Commit.Operation != models.CommitOperationDelete, ByDid: event.Did, RepostURI: repostURI, } if err := subscriber.Conn.WriteJSON(notification); err != nil { logger.Error("Failed to send notification", "subscriber", subscriber.DID, "error", err) } } } } return nil } type subscriberHandler struct{} func (h *subscriberHandler) HandleEvent(ctx context.Context, event *models.Event) error { switch event.Commit.Collection { case "app.bsky.feed.repost": modifySubscribersWithEvent( event, func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) }, func(s *SubscriberData, r bsky.FeedRepost) { s.Reposts[r.Subject.Uri] = struct{}{} }, ) case "app.bsky.graph.follow": modifySubscribersWithEvent( event, func(s *SubscriberData, r bsky.GraphFollow) { delete(s.Follows, r.Subject) }, func(s *SubscriberData, r bsky.GraphFollow) { s.Follows[r.Subject] = struct{}{} }, ) updateLikeStreamOpts() } return nil } type ModifyFunc[v any] func(*SubscriberData, v) func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error { var data v if err := json.Unmarshal(event.Commit.Record, &data); err != nil { logger.Error("Failed to unmarshal repost", "error", err) return nil } subscribersMux.Lock() defer subscribersMux.Unlock() if subscriber, exists := subscribers[event.Did]; exists { if event.Commit.Operation == models.CommitOperationDelete { onDelete(subscriber, data) } else { onUpdate(subscriber, data) } } return nil }