its for when you want to get like notifications for your reposts

feat: rest of the rest of the owl

- use like records Via field (dusk is stupid)
- keep records so we can actually detect them being deleted (like records that were there before the server started arent counted)
- overall make the code betterer

ptr.pet 4040f147 cde45e87

verified
Changed files
+176 -138
+2 -5
jetstream.go
···
import (
"context"
"log/slog"
-
"time"
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
···
config.Compress = true
config.WantedCollections = opts.WantedCollections
config.WantedDids = opts.WantedDIDs
-
config.RequireHello = len(config.WantedDids) == 0
+
config.RequireHello = false
scheduler := sequential.NewScheduler(name, logger, handleEvent)
···
}
startFn := func() error {
-
cursor := time.Now().UnixMicro()
-
logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs))
-
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
+
if err := c.ConnectAndRead(ctx, nil); err != nil {
logger.Error("jetstream client failed", "name", name, "error", err)
return err
}
+129 -117
main.go
···
"net/http"
"github.com/bluesky-social/indigo/api/bsky"
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/xrpc"
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/models"
···
const ListenTypeFollows = "follows"
type SubscriberData struct {
-
DID string
+
DID syntax.DID
Conn *websocket.Conn
ListenType string
-
ListenTo Set[string]
-
Reposts Set[string]
+
ListenTo Set[syntax.DID]
+
follows map[syntax.RecordKey]bsky.GraphFollow
+
}
+
+
type ListeneeData struct {
+
targets *hashmap.Map[syntax.DID, *SubscriberData]
+
likes map[syntax.RecordKey]bsky.FeedLike
}
type NotificationMessage struct {
-
Liked bool `json:"liked"`
-
ByDid string `json:"did"`
-
RepostURI string `json:"repost_uri"`
+
Liked bool `json:"liked"`
+
ByDid syntax.DID `json:"did"`
+
RepostURI syntax.ATURI `json:"repost_uri"`
}
type SubscriberMessage struct {
···
}
type SubscriberUpdateListenTo struct {
-
ListenTo []string `json:"listen_to"`
+
ListenTo []syntax.DID `json:"listen_to"`
}
var (
// storing the subscriber data in both Should Be Fine
// we dont modify subscriber data at the same time in two places
-
subscribers = hashmap.New[string, *SubscriberData]()
-
listeningTo = hashmap.New[string, *hashmap.Map[string, *SubscriberData]]()
+
subscribers = hashmap.New[syntax.DID, *SubscriberData]()
+
listeningTo = hashmap.New[syntax.DID, *ListeneeData]()
likeStream *client.Client
subscriberStream *client.Client
···
func getSubscriberDids() []string {
dids := make([]string, 0, subscribers.Len())
-
subscribers.Range(func(s string, sd *SubscriberData) bool {
-
dids = append(dids, s)
+
subscribers.Range(func(s syntax.DID, sd *SubscriberData) bool {
+
dids = append(dids, string(s))
return true
})
return dids
}
-
func listenTo(sd *SubscriberData, did string) {
-
targetDids, _ := listeningTo.GetOrInsert(did, hashmap.New[string, *SubscriberData]())
-
targetDids.Insert(sd.DID, sd)
+
func startListeningTo(sd *SubscriberData, did syntax.DID) {
+
ld, _ := listeningTo.GetOrInsert(did, &ListeneeData{
+
targets: hashmap.New[syntax.DID, *SubscriberData](),
+
likes: make(map[syntax.RecordKey]bsky.FeedLike),
+
})
+
ld.targets.Insert(sd.DID, sd)
}
-
func stopListeningTo(subscriberDid, did string) {
-
if targetDids, exists := listeningTo.Get(did); exists {
-
targetDids.Del(subscriberDid)
+
func stopListeningTo(subscriberDid, did syntax.DID) {
+
if ld, exists := listeningTo.Get(did); exists {
+
ld.targets.Del(subscriberDid)
}
}
···
func handleSubscribe(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
-
did := vars["did"]
+
did, err := syntax.ParseDID(vars["did"])
+
if err != nil {
+
http.Error(w, "not a valid did", http.StatusBadRequest)
+
return
+
}
query := r.URL.Query()
-
// "follows", everything else is considered as "none"
listenType := query.Get("listenTo")
if len(listenType) == 0 {
listenType = ListenTypeFollows
}
-
logger = logger.With("did", did)
+
logger := logger.With("did", did)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
···
Host: pdsURI,
}
-
var subbedTo Set[string]
+
sd := &SubscriberData{
+
DID: did,
+
Conn: conn,
+
ListenType: listenType,
+
}
switch listenType {
case ListenTypeFollows:
···
return
}
logger.Info("fetched follows")
-
subbedTo = follows
+
sd.follows = follows
+
sd.ListenTo = make(Set[syntax.DID])
+
for _, follow := range follows {
+
sd.ListenTo[syntax.DID(follow.Subject)] = struct{}{}
+
}
case ListenTypeNone:
-
subbedTo = make(Set[string])
+
sd.ListenTo = make(Set[syntax.DID])
default:
-
logger.Error("invalid listen type", "requestedType", listenType)
-
return
-
}
-
-
reposts, err := fetchReposts(r.Context(), xrpcClient, did)
-
if err != nil {
-
logger.Error("error fetching reposts", "error", err)
+
http.Error(w, "invalid listen type", http.StatusBadRequest)
return
}
-
logger.Info("fetched reposts")
-
-
sd := &SubscriberData{
-
DID: did,
-
Conn: conn,
-
ListenType: listenType,
-
ListenTo: subbedTo,
-
Reposts: reposts,
-
}
subscribers.Set(sd.DID, sd)
for listenDid := range sd.ListenTo {
-
listenTo(sd, listenDid)
+
startListeningTo(sd, listenDid)
}
-
updateSubscriberStreamOpts()
-
updateLikeStreamOpts()
// delete subscriber after we are done
defer func() {
for listenDid := range sd.ListenTo {
stopListeningTo(sd.DID, listenDid)
}
subscribers.Del(sd.DID)
-
updateSubscriberStreamOpts()
-
updateLikeStreamOpts()
}()
logger.Info("serving subscriber")
···
}
for _, listenDid := range innerMsg.ListenTo {
sd.ListenTo[listenDid] = struct{}{}
-
listenTo(sd, listenDid)
+
startListeningTo(sd, listenDid)
}
}
}
···
func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
return models.SubscriberOptionsUpdatePayload{
WantedCollections: []string{"app.bsky.feed.like"},
-
// WantedDIDs: getFollowsDids(),
}
}
···
}
}
-
func updateLikeStreamOpts() {
-
opts := getLikeStreamOpts()
-
err := likeStream.SendOptionsUpdate(opts)
-
if err != nil {
-
logger.Error("couldnt update like stream opts", "error", err)
-
return
-
}
-
logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs))
-
}
-
func updateSubscriberStreamOpts() {
opts := getSubscriberStreamOpts()
err := subscriberStream.SendOptionsUpdate(opts)
···
}
func HandleLikeEvent(ctx context.Context, event *models.Event) error {
-
if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 {
+
if event == nil || event.Commit == nil {
return nil
}
+
byDid := syntax.DID(event.Did)
// skip handling event if its not from a source we are listening to
-
targets, exists := listeningTo.Get(event.Did)
+
ld, exists := listeningTo.Get(byDid)
if !exists {
return nil
}
+
deleted := event.Commit.Operation == models.CommitOperationDelete
+
rkey := syntax.RecordKey(event.Commit.RKey)
+
var like bsky.FeedLike
-
if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
-
logger.Error("failed to unmarshal like", "error", err)
+
if deleted {
+
if l, exists := ld.likes[rkey]; exists {
+
like = l
+
defer delete(ld.likes, rkey)
+
} else {
+
logger.Error("like record not found", "rkey", rkey)
+
return nil
+
}
+
} else {
+
if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
+
logger.Error("failed to unmarshal like", "error", err)
+
return nil
+
}
+
}
+
+
// if there is no via it means its not a repost anyway
+
if like.Via == nil {
return nil
}
-
targets.Range(func(s string, sd *SubscriberData) bool {
-
for repostURI, _ := range sd.Reposts {
-
// (un)liked a post the subscriber reposted
-
if like.Subject.Uri == repostURI {
-
notification := NotificationMessage{
-
Liked: event.Commit.Operation != models.CommitOperationDelete,
-
ByDid: event.Did,
-
RepostURI: repostURI,
-
}
+
// store for later when it gets deleted so we can fetch the record
+
if !deleted {
+
ld.likes[rkey] = like
+
}
-
if err := sd.Conn.WriteJSON(notification); err != nil {
-
logger.Error("failed to send notification", "subscriber", sd.DID, "error", err)
-
}
-
}
+
repostURI := syntax.ATURI(like.Via.Uri)
+
// if not a repost we dont care
+
if repostURI.Collection() != "app.bsky.feed.repost" {
+
return nil
+
}
+
reposterDID, err := repostURI.Authority().AsDID()
+
if err != nil {
+
return err
+
}
+
if sd, exists := ld.targets.Get(reposterDID); exists {
+
notification := NotificationMessage{
+
Liked: !deleted,
+
ByDid: byDid,
+
RepostURI: repostURI,
}
-
return true
-
})
+
+
if err := sd.Conn.WriteJSON(notification); err != nil {
+
logger.Error("failed to send notification", "subscriber", sd.DID, "error", err)
+
}
+
}
return nil
}
···
return nil
}
+
byDid := syntax.DID(event.Did)
+
sd, exists := subscribers.Get(byDid)
+
if !exists {
+
return nil
+
}
+
+
deleted := event.Commit.Operation == models.CommitOperationDelete
+
rkey := syntax.RecordKey(event.Commit.RKey)
+
switch event.Commit.Collection {
-
case "app.bsky.feed.repost":
-
modifySubscribersWithEvent(
-
event,
-
func(s *SubscriberData, r bsky.FeedRepost, deleted bool) {
-
if deleted {
-
delete(s.Reposts, r.Subject.Uri)
-
} else {
-
s.Reposts[r.Subject.Uri] = struct{}{}
-
}
-
},
-
)
case "app.bsky.graph.follow":
-
modifySubscribersWithEvent(
-
event,
-
func(s *SubscriberData, r bsky.GraphFollow, deleted bool) {
-
// if we arent managing then we dont need to update anything
-
if s.ListenType != ListenTypeFollows {
-
return
-
}
-
if deleted {
-
stopListeningTo(s.DID, r.Subject)
-
delete(s.ListenTo, r.Subject)
-
} else {
-
s.ListenTo[r.Subject] = struct{}{}
-
listenTo(s, r.Subject)
-
}
-
},
-
)
+
// if we arent managing then we dont need to update anything
+
if sd.ListenType != ListenTypeFollows {
+
return nil
+
}
+
var r bsky.GraphFollow
+
if deleted {
+
if f, exists := sd.follows[rkey]; exists {
+
r = f
+
} else {
+
logger.Error("follow record not found", "rkey", rkey)
+
return nil
+
}
+
subjectDid := syntax.DID(r.Subject)
+
stopListeningTo(sd.DID, subjectDid)
+
delete(sd.ListenTo, subjectDid)
+
delete(sd.follows, rkey)
+
} else {
+
if err := unmarshalEvent(event, &r); err != nil {
+
return err
+
}
+
subjectDid := syntax.DID(r.Subject)
+
sd.ListenTo[subjectDid] = struct{}{}
+
sd.follows[rkey] = r
+
startListeningTo(sd, subjectDid)
+
}
}
return nil
}
-
type ModifyFunc[v any] func(*SubscriberData, v, bool)
-
-
func modifySubscribersWithEvent[v any](event *models.Event, handle ModifyFunc[v]) error {
-
if len(event.Commit.Record) == 0 {
+
func unmarshalEvent[v any](event *models.Event, val *v) error {
+
if err := json.Unmarshal(event.Commit.Record, val); err != nil {
+
logger.Error("failed to unmarshal", "error", err, "raw", event.Commit.Record)
return nil
}
-
-
var data v
-
if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
-
logger.Error("failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
-
return nil
-
}
-
-
if subscriber, exists := subscribers.Get(event.Did); exists {
-
handle(subscriber, data, event.Commit.Operation == models.CommitOperationDelete)
-
}
-
return nil
}
+45 -16
xrpc.go
···
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/bluesky-social/indigo/xrpc"
+
"github.com/bluesky-social/jetstream/pkg/models"
)
-
func findUserPDS(ctx context.Context, did string) (string, error) {
-
id, err := identity.DefaultDirectory().LookupDID(ctx, syntax.DID(did))
+
func findUserPDS(ctx context.Context, did syntax.DID) (string, error) {
+
id, err := identity.DefaultDirectory().LookupDID(ctx, did)
if err != nil {
return "", err
}
···
return pdsURI, nil
}
-
func fetchRecords[v any](ctx context.Context, xrpcClient *xrpc.Client, collection, did string, extractFn func(v) string) (Set[string], error) {
-
all := make(Set[string])
+
func fetchRecord[v any](ctx context.Context, xrpcClient *xrpc.Client, val *v, event *models.Event) error {
+
out, err := atproto.RepoGetRecord(ctx, xrpcClient, "", event.Commit.Collection, event.Did, event.Commit.RKey)
+
if err != nil {
+
return err
+
}
+
raw, _ := out.Value.MarshalJSON()
+
if err := json.Unmarshal(raw, val); err != nil {
+
return err
+
}
+
return nil
+
}
+
+
func fetchRecords[v any](ctx context.Context, xrpcClient *xrpc.Client, cb func(syntax.ATURI, v), collection string, did syntax.DID) error {
+
if xrpcClient == nil {
+
pdsURI, err := findUserPDS(ctx, did)
+
if err != nil {
+
return err
+
}
+
xrpcClient = &xrpc.Client{
+
Host: pdsURI,
+
}
+
}
+
cursor := ""
for {
// todo: ratelimits?? idk what this does for those
-
out, err := atproto.RepoListRecords(ctx, xrpcClient, collection, cursor, 100, did, false)
+
out, err := atproto.RepoListRecords(ctx, xrpcClient, collection, cursor, 100, string(did), false)
if err != nil {
-
return nil, err
+
return err
}
for _, record := range out.Records {
raw, _ := record.Value.MarshalJSON()
var val v
if err := json.Unmarshal(raw, &val); err != nil {
-
return nil, err
+
return err
}
-
s := extractFn(val)
-
if len(s) > 0 {
-
all[s] = struct{}{}
-
}
+
cb(syntax.ATURI(record.Uri), val)
}
if out.Cursor == nil || *out.Cursor == "" {
break
}
cursor = *out.Cursor
+
+
break
}
-
return all, nil
+
return nil
}
-
func fetchReposts(ctx context.Context, xrpcClient *xrpc.Client, did string) (Set[string], error) {
-
return fetchRecords(ctx, xrpcClient, "app.bsky.feed.repost", did, func(v bsky.FeedRepost) string { return v.Subject.Uri })
+
func fetchFollows(ctx context.Context, xrpcClient *xrpc.Client, did syntax.DID) (map[syntax.RecordKey]bsky.GraphFollow, error) {
+
out := make(map[syntax.RecordKey]bsky.GraphFollow)
+
fetchRecords(ctx, xrpcClient, func(uri syntax.ATURI, f bsky.GraphFollow) { out[uri.RecordKey()] = f }, "app.bsky.graph.follow", did)
+
return out, nil
}
-
func fetchFollows(ctx context.Context, xrpcClient *xrpc.Client, did string) (Set[string], error) {
-
return fetchRecords(ctx, xrpcClient, "app.bsky.graph.follow", did, func(v bsky.GraphFollow) string { return v.Subject })
+
func fetchRepostLikes(ctx context.Context, xrpcClient *xrpc.Client, did syntax.DID) (map[syntax.RecordKey]bsky.FeedLike, error) {
+
out := make(map[syntax.RecordKey]bsky.FeedLike)
+
fetchRecords(ctx, xrpcClient, func(uri syntax.ATURI, f bsky.FeedLike) {
+
if f.Via != nil && syntax.ATURI(f.Via.Uri).Collection() == "app.bsky.feed.repost" {
+
out[uri.RecordKey()] = f
+
}
+
}, "app.bsky.feed.like", did)
+
return out, nil
}