···
subscribers = hashmap.New[string, *SubscriberData]()
likeStream *client.Client
subscriberStream *client.Client
···
logger.Info("fetched reposts")
-
subscriber := &SubscriberData{
// use user follows as default listen to
···
-
subscribers.Set(did, subscriber)
updateSubscriberStreamOpts()
// delete subscriber after we are done
updateSubscriberStreamOpts()
···
func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
return models.SubscriberOptionsUpdatePayload{
WantedCollections: []string{"app.bsky.feed.like"},
-
WantedDIDs: getFollowsDids(),
···
if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
-
logger.Error("Failed to unmarshal like", "error", err)
-
subscribers.Range(func(s string, sd *SubscriberData) bool {
for repostURI, _ := range sd.Reposts {
// (un)liked a post the subscriber reposted
if like.Subject.Uri == repostURI {
···
if err := sd.Conn.WriteJSON(notification); err != nil {
-
logger.Error("Failed to send notification", "subscriber", sd.DID, "error", err)
···
case "app.bsky.graph.follow":
modifySubscribersWithEvent(
-
func(s *SubscriberData, r bsky.GraphFollow) { delete(s.ListenTo, r.Subject) },
func(s *SubscriberData, r bsky.GraphFollow) {
s.ListenTo[r.Subject] = struct{}{}
···
subscribers = hashmap.New[string, *SubscriberData]()
+
listeningTo = hashmap.New[string, *hashmap.Map[string, *SubscriberData]]()
likeStream *client.Client
subscriberStream *client.Client
···
logger.Info("fetched reposts")
// use user follows as default listen to
···
+
subscribers.Set(sd.DID, sd)
+
for listenDid := range sd.ListenTo {
+
listenTo(sd, listenDid)
updateSubscriberStreamOpts()
// delete subscriber after we are done
+
for listenDid := range sd.ListenTo {
+
stopListeningTo(sd.DID, listenDid)
+
subscribers.Del(sd.DID)
updateSubscriberStreamOpts()
···
+
func listenTo(sd *SubscriberData, did string) {
+
targetDids, _ := listeningTo.GetOrInsert(did, hashmap.New[string, *SubscriberData]())
+
targetDids.Insert(sd.DID, sd)
+
func stopListeningTo(subscriberDid, did string) {
+
if targetDids, exists := listeningTo.Get(did); exists {
+
targetDids.Del(subscriberDid)
func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
return models.SubscriberOptionsUpdatePayload{
WantedCollections: []string{"app.bsky.feed.like"},
+
// WantedDIDs: getFollowsDids(),
···
+
// skip handling event if its not from a source we are listening to
+
targets, exists := listeningTo.Get(event.Did)
if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
+
logger.Error("failed to unmarshal like", "error", err)
+
targets.Range(func(s string, sd *SubscriberData) bool {
for repostURI, _ := range sd.Reposts {
// (un)liked a post the subscriber reposted
if like.Subject.Uri == repostURI {
···
if err := sd.Conn.WriteJSON(notification); err != nil {
+
logger.Error("failed to send notification", "subscriber", sd.DID, "error", err)
···
case "app.bsky.graph.follow":
modifySubscribersWithEvent(
+
func(s *SubscriberData, r bsky.GraphFollow) {
+
delete(s.ListenTo, r.Subject)
+
stopListeningTo(s.DID, r.Subject)
func(s *SubscriberData, r bsky.GraphFollow) {
s.ListenTo[r.Subject] = struct{}{}