···
type Set[T comparable] map[T]struct{}
21
+
const ListenTypeNone = "none"
22
+
const ListenTypeFollows = "follows"
type SubscriberData struct {
23
-
Conn *websocket.Conn
24
-
ListenTo Set[string]
26
+
Conn *websocket.Conn
28
+
ListenTo Set[string]
type NotificationMessage struct {
···
RepostURI string `json:"repost_uri"`
38
+
type SubscriberMessage struct {
39
+
Type string `json:"type"`
40
+
Content json.RawMessage `json:"content"`
43
+
type SubscriberUpdateListenTo struct {
44
+
ListenTo []string `json:"listen_to"`
// storing the subscriber data in both Should Be Fine
// we dont modify subscriber data at the same time in two places
···
75
-
go likeStreamLoop(logger)
76
-
go subscriberStreamLoop(logger)
88
+
go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
89
+
go startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
81
-
log.Println("Server starting on :8080")
94
+
log.Println("server starting on :8080")
if err := http.ListenAndServe(":8080", r); err != nil {
log.Fatalf("error while serving: %s", err)
···
104
+
query := r.URL.Query()
105
+
// "follows", everything else is considered as "none"
106
+
listenType := query.Get("listenTo")
107
+
if len(listenType) == 0 {
108
+
listenType = ListenTypeFollows
logger = logger.With("did", did)
conn, err := upgrader.Upgrade(w, r, nil)
···
xrpcClient := &xrpc.Client{
112
-
// todo: implement skipping fetching follows and allow specifying users to listen to via websocket
113
-
follows, err := fetchFollows(r.Context(), xrpcClient, did)
115
-
logger.Error("error fetching follows", "error", err)
133
+
var subbedTo Set[string]
135
+
switch listenType {
136
+
case ListenTypeFollows:
137
+
follows, err := fetchFollows(r.Context(), xrpcClient, did)
139
+
logger.Error("error fetching follows", "error", err)
142
+
logger.Info("fetched follows")
144
+
case ListenTypeNone:
145
+
subbedTo = make(Set[string])
147
+
logger.Error("invalid listen type", "requestedType", listenType)
118
-
logger.Info("fetched follows")
reposts, err := fetchReposts(r.Context(), xrpcClient, did)
logger.Error("error fetching reposts", "error", err)
···
logger.Info("fetched reposts")
129
-
// use user follows as default listen to
161
+
ListenType: listenType,
162
+
ListenTo: subbedTo,
subscribers.Set(sd.DID, sd)
···
logger.Info("serving subscriber")
155
-
_, _, err := conn.ReadMessage()
187
+
var msg SubscriberMessage
188
+
err := conn.ReadJSON(&msg)
logger.Info("WebSocket connection closed", "error", err)
194
+
case "update_listen_to":
195
+
// only allow this if we arent managing listen to
196
+
if sd.ListenType != ListenTypeNone {
200
+
var innerMsg SubscriberUpdateListenTo
201
+
if err := json.Unmarshal(msg.Content, &innerMsg); err != nil {
202
+
logger.Info("invalid message", "error", err)
205
+
// remove all current listens and add the ones the user requested
206
+
for listenDid := range sd.ListenTo {
207
+
stopListeningTo(sd.DID, listenDid)
208
+
delete(sd.ListenTo, listenDid)
210
+
for _, listenDid := range innerMsg.ListenTo {
211
+
sd.ListenTo[listenDid] = struct{}{}
212
+
listenTo(sd, listenDid)
···
logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
197
-
func likeStreamLoop(logger *slog.Logger) {
198
-
startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
201
-
func subscriberStreamLoop(logger *slog.Logger) {
202
-
startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
func HandleLikeEvent(ctx context.Context, event *models.Event) error {
···
case "app.bsky.feed.repost":
modifySubscribersWithEvent(
252
-
func(s *SubscriberData, r bsky.FeedRepost) { delete(s.Reposts, r.Subject.Uri) },
253
-
func(s *SubscriberData, r bsky.FeedRepost) {
254
-
s.Reposts[r.Subject.Uri] = struct{}{}
299
+
func(s *SubscriberData, r bsky.FeedRepost, deleted bool) {
301
+
delete(s.Reposts, r.Subject.Uri)
303
+
s.Reposts[r.Subject.Uri] = struct{}{}
case "app.bsky.graph.follow":
modifySubscribersWithEvent(
260
-
func(s *SubscriberData, r bsky.GraphFollow) {
261
-
delete(s.ListenTo, r.Subject)
262
-
stopListeningTo(s.DID, r.Subject)
264
-
func(s *SubscriberData, r bsky.GraphFollow) {
265
-
s.ListenTo[r.Subject] = struct{}{}
266
-
listenTo(s, r.Subject)
310
+
func(s *SubscriberData, r bsky.GraphFollow, deleted bool) {
311
+
// if we arent managing then we dont need to update anything
312
+
if s.ListenType != ListenTypeFollows {
316
+
stopListeningTo(s.DID, r.Subject)
317
+
delete(s.ListenTo, r.Subject)
319
+
s.ListenTo[r.Subject] = struct{}{}
320
+
listenTo(s, r.Subject)
···
274
-
type ModifyFunc[v any] func(*SubscriberData, v)
329
+
type ModifyFunc[v any] func(*SubscriberData, v, bool)
276
-
func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error {
331
+
func modifySubscribersWithEvent[v any](event *models.Event, handle ModifyFunc[v]) error {
if len(event.Commit.Record) == 0 {
if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
283
-
logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
338
+
logger.Error("failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
if subscriber, exists := subscribers.Get(event.Did); exists {
288
-
if event.Commit.Operation == models.CommitOperationDelete {
289
-
onDelete(subscriber, data)
291
-
onUpdate(subscriber, data)
343
+
handle(subscriber, data, event.Commit.Operation == models.CommitOperationDelete)