···
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/atproto/syntax"
···
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/cornelk/hashmap"
17
+
"github.com/google/uuid"
"github.com/gorilla/websocket"
···
const ListenTypeFollows = "follows"
type SubscriberData struct {
27
-
Conn *websocket.Conn
29
-
ListenTo Set[syntax.DID]
30
-
follows map[syntax.RecordKey]bsky.GraphFollow
28
+
SubscribedTo syntax.DID
29
+
Conn *websocket.Conn
31
+
ListenTo Set[syntax.DID]
33
-
type ListeneeData struct {
34
-
targets *hashmap.Map[syntax.DID, *SubscriberData]
35
-
likes map[syntax.RecordKey]bsky.FeedLike
34
+
type UserData struct {
35
+
targets *hashmap.Map[string, *SubscriberData]
36
+
likes map[syntax.RecordKey]bsky.FeedLike
37
+
follows *hashmap.Map[syntax.RecordKey, bsky.GraphFollow]
38
+
followsCursor atomic.Pointer[string]
type NotificationMessage struct {
···
// storing the subscriber data in both Should Be Fine
// we dont modify subscriber data at the same time in two places
56
-
subscribers = hashmap.New[syntax.DID, *SubscriberData]()
57
-
listeningTo = hashmap.New[syntax.DID, *ListeneeData]()
59
+
subscribers = hashmap.New[string, *SubscriberData]()
60
+
userData = hashmap.New[syntax.DID, *UserData]()
59
-
likeStream *client.Client
60
-
subscriberStream *client.Client
62
+
likeStream *client.Client
63
+
followStream *client.Client
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
···
func getSubscriberDids() []string {
dids := make([]string, 0, subscribers.Len())
73
-
subscribers.Range(func(s syntax.DID, sd *SubscriberData) bool {
74
-
dids = append(dids, string(s))
76
+
subscribers.Range(func(s string, sd *SubscriberData) bool {
77
+
dids = append(dids, string(sd.SubscribedTo))
80
-
func startListeningTo(sd *SubscriberData, did syntax.DID) {
81
-
ld, _ := listeningTo.GetOrInsert(did, &ListeneeData{
82
-
targets: hashmap.New[syntax.DID, *SubscriberData](),
83
+
func getUserData(did syntax.DID) *UserData {
84
+
ud, _ := userData.GetOrInsert(did, &UserData{
85
+
targets: hashmap.New[string, *SubscriberData](),
likes: make(map[syntax.RecordKey]bsky.FeedLike),
87
+
follows: hashmap.New[syntax.RecordKey, bsky.GraphFollow](),
85
-
ld.targets.Insert(sd.DID, sd)
88
-
func stopListeningTo(subscriberDid, did syntax.DID) {
89
-
if ld, exists := listeningTo.Get(did); exists {
90
-
ld.targets.Del(subscriberDid)
92
+
func startListeningTo(sid string, sd *SubscriberData, did syntax.DID) {
93
+
ud := getUserData(did)
94
+
ud.targets.Insert(sid, sd)
97
+
func stopListeningTo(sid string, did syntax.DID) {
98
+
if ud, exists := userData.Get(did); exists {
···
go startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
98
-
go startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
107
+
go startJetstreamLoop(logger, &followStream, "subscriber", HandleFollowEvent, getFollowStreamOpts)
r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
···
http.Error(w, "not a valid did", http.StatusBadRequest)
125
+
sid := uuid.New().String()
listenType := query.Get("listenTo")
···
listenType = ListenTypeFollows
123
-
logger := logger.With("did", did)
133
+
logger := logger.With("did", did, "subscriberId", sid)
conn, err := upgrader.Upgrade(w, r, nil)
···
155
+
ud := getUserData(did)
148
-
ListenType: listenType,
159
+
ListenType: listenType,
153
-
follows, err := fetchFollows(r.Context(), xrpcClient, did)
164
+
follows, err := fetchFollows(r.Context(), xrpcClient, ud.followsCursor.Load(), did)
logger.Error("error fetching follows", "error", err)
158
-
logger.Info("fetched follows")
159
-
sd.follows = follows
sd.ListenTo = make(Set[syntax.DID])
161
-
for _, follow := range follows {
162
-
sd.ListenTo[syntax.DID(follow.Subject)] = struct{}{}
170
+
if len(follows) > 0 {
171
+
// store cursor for later requests so we dont have to fetch the whole thing again
172
+
ud.followsCursor.Store((*string)(&follows[len(follows)-1].rkey))
173
+
for _, f := range follows {
174
+
ud.follows.Insert(f.rkey, f.follow)
175
+
sd.ListenTo[syntax.DID(f.follow.Subject)] = struct{}{}
178
+
logger.Info("fetched follows")
sd.ListenTo = make(Set[syntax.DID])
···
171
-
subscribers.Set(sd.DID, sd)
186
+
subscribers.Set(sid, sd)
for listenDid := range sd.ListenTo {
173
-
startListeningTo(sd, listenDid)
188
+
startListeningTo(sid, sd, listenDid)
175
-
updateSubscriberStreamOpts()
190
+
updateFollowStreamOpts()
// delete subscriber after we are done
for listenDid := range sd.ListenTo {
179
-
stopListeningTo(sd.DID, listenDid)
194
+
stopListeningTo(sid, listenDid)
181
-
subscribers.Del(sd.DID)
182
-
updateSubscriberStreamOpts()
196
+
subscribers.Del(sid)
197
+
updateFollowStreamOpts()
logger.Info("serving subscriber")
···
// remove all current listens and add the ones the user requested
for listenDid := range sd.ListenTo {
208
-
stopListeningTo(sd.DID, listenDid)
223
+
stopListeningTo(sid, listenDid)
delete(sd.ListenTo, listenDid)
for _, listenDid := range innerMsg.ListenTo {
sd.ListenTo[listenDid] = struct{}{}
213
-
startListeningTo(sd, listenDid)
228
+
startListeningTo(sid, sd, listenDid)
···
225
-
func getSubscriberStreamOpts() models.SubscriberOptionsUpdatePayload {
240
+
func getFollowStreamOpts() models.SubscriberOptionsUpdatePayload {
return models.SubscriberOptionsUpdatePayload{
227
-
WantedCollections: []string{"app.bsky.feed.repost", "app.bsky.graph.follow"},
242
+
WantedCollections: []string{"app.bsky.graph.follow"},
WantedDIDs: getSubscriberDids(),
232
-
func updateSubscriberStreamOpts() {
233
-
opts := getSubscriberStreamOpts()
234
-
err := subscriberStream.SendOptionsUpdate(opts)
247
+
func updateFollowStreamOpts() {
248
+
opts := getFollowStreamOpts()
249
+
err := followStream.SendOptionsUpdate(opts)
236
-
logger.Error("couldnt update subscriber stream opts", "error", err)
251
+
logger.Error("couldnt update follow stream opts", "error", err)
239
-
logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
254
+
logger.Info("updated follow stream opts", "userCount", len(opts.WantedDIDs))
func HandleLikeEvent(ctx context.Context, event *models.Event) error {
···
byDid := syntax.DID(event.Did)
// skip handling event if its not from a source we are listening to
249
-
ld, exists := listeningTo.Get(byDid)
264
+
ud, exists := userData.Get(byDid)
265
+
if !exists || ud.targets.Len() == 0 {
···
259
-
if l, exists := ld.likes[rkey]; exists {
274
+
if l, exists := ud.likes[rkey]; exists {
261
-
defer delete(ld.likes, rkey)
276
+
defer delete(ud.likes, rkey)
logger.Error("like record not found", "rkey", rkey)
···
// store for later when it gets deleted so we can fetch the record
280
-
ld.likes[rkey] = like
295
+
ud.likes[rkey] = like
repostURI := syntax.ATURI(like.Via.Uri)
···
292
-
if sd, exists := ld.targets.Get(reposterDID); exists {
307
+
ud.targets.Range(func(sid string, sd *SubscriberData) bool {
308
+
if sd.SubscribedTo != reposterDID {
notification := NotificationMessage{
···
if err := sd.Conn.WriteJSON(notification); err != nil {
300
-
logger.Error("failed to send notification", "subscriber", sd.DID, "error", err)
319
+
logger.Error("failed to send notification", "subscriber", sd.SubscribedTo, "error", err)
307
-
func HandleSubscriberEvent(ctx context.Context, event *models.Event) error {
327
+
func HandleFollowEvent(ctx context.Context, event *models.Event) error {
if event == nil || event.Commit == nil {
byDid := syntax.DID(event.Did)
313
-
sd, exists := subscribers.Get(byDid)
333
+
ud, exists := userData.Get(byDid)
334
+
if !exists || ud.targets.Len() == 0 {
···
switch event.Commit.Collection {
case "app.bsky.graph.follow":
323
-
// if we arent managing then we dont need to update anything
324
-
if sd.ListenType != ListenTypeFollows {
329
-
if f, exists := sd.follows[rkey]; exists {
345
+
if f, exists := ud.follows.Get(rkey); exists {
logger.Error("follow record not found", "rkey", rkey)
335
-
subjectDid := syntax.DID(r.Subject)
336
-
stopListeningTo(sd.DID, subjectDid)
337
-
delete(sd.ListenTo, subjectDid)
338
-
delete(sd.follows, rkey)
351
+
ud.follows.Del(rkey)
if err := unmarshalEvent(event, &r); err != nil {
354
+
logger.Error("could not unmarshal follow event", "error", err)
357
+
ud.follows.Insert(rkey, r)
359
+
ud.targets.Range(func(sid string, sd *SubscriberData) bool {
360
+
// if we arent managing then we dont need to update anything
361
+
if sd.ListenType != ListenTypeFollows {
subjectDid := syntax.DID(r.Subject)
344
-
sd.ListenTo[subjectDid] = struct{}{}
345
-
sd.follows[rkey] = r
346
-
startListeningTo(sd, subjectDid)
366
+
stopListeningTo(sid, subjectDid)
367
+
delete(sd.ListenTo, subjectDid)
369
+
sd.ListenTo[subjectDid] = struct{}{}
370
+
startListeningTo(sid, sd, subjectDid)