its for when you want to get like notifications for your reposts

feat: the rest of the owl

ptr.pet 95ce39fa 4d536dba

verified
Changed files
+224 -178
+4
go.mod
···
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
+
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
···
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e // indirect
+
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect
+
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
···
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sys v0.22.0 // indirect
+
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.34.2 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
+8
go.sum
···
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
+
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/go-block-format v0.2.0 h1:ZqrkxBA2ICbDRbK8KJs/u0O3dlp6gmAuuXUJNiW1Ycs=
···
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b h1:CzigHMRySiX3drau9C6Q5CAbNIApmLdat5jPMqChvDA=
+
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b/go.mod h1:/y/V339mxv2sZmYYR64O07VuCpdNZqCTwO8ZcouTMI8=
+
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 h1:qwDnMxjkyLmAFgcfgTnfJrmYKWhHnci3GjDqcZp1M3Q=
+
gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02/go.mod h1:JTnUj0mpYiAsuZLmKjTx/ex3AtMowcCgnE7YNyCEP0I=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo=
go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc=
···
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
+
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+59
jetstream.go
···
+
package main
+
+
import (
+
"context"
+
"log/slog"
+
"time"
+
+
"github.com/bluesky-social/jetstream/pkg/client"
+
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
+
"github.com/bluesky-social/jetstream/pkg/models"
+
)
+
+
type HandleEvent func(context.Context, *models.Event) error
+
+
func startJetstreamLoop(logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, optsFn func() models.SubscriberOptionsUpdatePayload) {
+
for {
+
stream, startFn, err := startJetstreamClient(name, optsFn(), handleEvent)
+
*outStream = stream
+
if startFn != nil {
+
err = startFn()
+
}
+
if err != nil {
+
logger.Error("stream failed", "name", name, "error", err)
+
}
+
}
+
}
+
+
func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent HandleEvent) (*client.Client, func() error, error) {
+
ctx := context.Background()
+
+
config := client.DefaultClientConfig()
+
config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
+
config.Compress = true
+
config.WantedCollections = opts.WantedCollections
+
config.WantedDids = opts.WantedDIDs
+
config.RequireHello = len(config.WantedDids) == 0
+
+
scheduler := sequential.NewScheduler(name, logger, handleEvent)
+
+
c, err := client.NewClient(config, logger, scheduler)
+
if err != nil {
+
logger.Error("failed to create jetstream client", "name", name, "error", err)
+
return nil, nil, err
+
}
+
+
startFn := func() error {
+
cursor := time.Now().UnixMicro()
+
+
logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs))
+
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
+
logger.Error("jetstream client failed", "name", name, "error", err)
+
return err
+
}
+
+
return nil
+
}
+
+
return c, startFn, nil
+
}
+87 -178
main.go
···
import (
"context"
"encoding/json"
-
"fmt"
"log"
"log/slog"
"net/http"
"sync"
-
"time"
-
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/xrpc"
"github.com/bluesky-social/jetstream/pkg/client"
-
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
···
// Data structures
type SubscriberData struct {
-
DID string
-
Conn *websocket.Conn
-
Follows Set[string]
-
Reposts Set[string]
+
DID string
+
Conn *websocket.Conn
+
ListenTo Set[string]
+
Reposts Set[string]
}
type NotificationMessage struct {
···
likeStream *client.Client
subscriberStream *client.Client
-
xrpcClient *xrpc.Client
-
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
···
logger *slog.Logger
)
-
func main() {
-
logger = slog.Default()
+
func getFollowsDids() []string {
+
subscribersMux.RLock()
+
defer subscribersMux.RUnlock()
-
xrpcClient = &xrpc.Client{
-
Client: &http.Client{
-
Timeout: 30 * time.Second,
-
},
-
Host: "https://bsky.social",
+
var dids []string
+
for _, subscriber := range subscribers {
+
for follow, _ := range subscriber.ListenTo {
+
dids = append(dids, follow)
+
}
}
-
if err := initializeJetstreams(); err != nil {
-
log.Fatalf("cannot start jetstream: %s", err)
+
return dids
+
}
+
+
func getSubscriberDids() []string {
+
subscribersMux.RLock()
+
defer subscribersMux.RUnlock()
+
+
var dids []string
+
for did := range subscribers {
+
dids = append(dids, did)
}
+
return dids
+
}
+
+
func main() {
+
logger = slog.Default()
+
+
go likeStreamLoop(logger)
+
go subscriberStreamLoop(logger)
+
r := mux.NewRouter()
r.HandleFunc("/subscribe/{did}", handleSubscribe).Methods("GET")
···
vars := mux.Vars(r)
did := vars["did"]
+
logger = logger.With("did", did)
+
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Error("WebSocket upgrade failed", "error", err)
···
}
defer conn.Close()
-
logger.Info("New subscriber", "did", did)
+
logger.Info("new subscriber")
-
follows, err := fetchFollows(r.Context(), did)
+
pdsURI, err := findUserPDS(r.Context(), did)
if err != nil {
-
logger.Error("Error fetching follows", "did", did, "error", err)
+
logger.Error("cant resolve user pds", "error", err)
return
}
+
logger = logger.With("pds", pdsURI)
-
reposts, err := fetchReposts(r.Context(), did)
+
xrpcClient := &xrpc.Client{
+
Host: pdsURI,
+
}
+
// todo: implement skipping fetching follows and allow specifying users to listen to via websocket
+
follows, err := fetchFollows(r.Context(), xrpcClient, did)
+
if err != nil {
+
logger.Error("error fetching follows", "error", err)
+
return
+
}
+
logger.Info("fetched follows")
+
reposts, err := fetchReposts(r.Context(), xrpcClient, did)
if err != nil {
-
logger.Error("Error fetching reposts", "did", did, "error", err)
+
logger.Error("error fetching reposts", "error", err)
return
}
+
logger.Info("fetched reposts")
-
// Store subscriber data
subscriber := &SubscriberData{
-
DID: did,
-
Conn: conn,
-
Follows: follows,
-
Reposts: reposts,
+
DID: did,
+
Conn: conn,
+
// use user follows as default listen to
+
ListenTo: follows,
+
Reposts: reposts,
}
subscribersMux.Lock()
subscribers[did] = subscriber
subscribersMux.Unlock()
updateSubscriberStreamOpts()
+
updateLikeStreamOpts()
// delete subscriber after we are done
defer func() {
subscribersMux.Lock()
delete(subscribers, did)
subscribersMux.Unlock()
updateSubscriberStreamOpts()
+
updateLikeStreamOpts()
}()
-
for {
-
_, _, err := conn.ReadMessage()
-
if err != nil {
-
logger.Info("WebSocket connection closed", "did", did, "error", err)
-
break
-
}
-
}
-
}
-
-
func fetchReposts(ctx context.Context, did string) (Set[string], error) {
-
all := make(Set[string])
-
cursor := ""
+
logger.Info("serving subscriber")
for {
-
out, err := atproto.RepoListRecords(ctx, &xrpc.Client{}, "app.bsky.feed.repost", cursor, 100, did, false)
+
_, _, err := conn.ReadMessage()
if err != nil {
-
return nil, err
-
}
-
-
for _, record := range out.Records {
-
all[record.Uri] = struct{}{}
-
}
-
-
if out.Cursor == nil || *out.Cursor == "" {
+
logger.Info("WebSocket connection closed", "error", err)
break
}
-
cursor = *out.Cursor
}
-
-
return all, nil
-
}
-
-
func fetchFollows(ctx context.Context, did string) (Set[string], error) {
-
all := make(Set[string])
-
cursor := ""
-
-
for {
-
out, err := bsky.GraphGetFollows(ctx, &xrpc.Client{}, did, cursor, 100)
-
if err != nil {
-
return nil, err
-
}
-
-
for _, record := range out.Follows {
-
all[record.Did] = struct{}{}
-
}
-
-
if out.Cursor == nil || *out.Cursor == "" {
-
break
-
}
-
cursor = *out.Cursor
-
}
-
-
return all, nil
-
}
-
-
func initializeJetstreams() error {
-
if err := startLikeClient(); err != nil {
-
return fmt.Errorf("like stream: %w", err)
-
}
-
if err := startSubscriberClient(); err != nil {
-
return fmt.Errorf("subscriber stream: %w", err)
-
}
-
return nil
}
func getLikeStreamOpts() models.SubscriberOptionsUpdatePayload {
···
}
func updateLikeStreamOpts() {
-
err := likeStream.SendOptionsUpdate(getLikeStreamOpts())
-
if err != nil {
-
// reinit like stream
-
}
-
}
-
-
func updateSubscriberStreamOpts() {
-
err := subscriberStream.SendOptionsUpdate(getSubscriberStreamOpts())
-
if err != nil {
-
// reinit subscriber stream
-
}
-
}
-
-
func startLikeClient() error {
opts := getLikeStreamOpts()
-
if len(opts.WantedDIDs) == 0 {
-
return nil // No follows to track
-
}
-
-
handler := &likeHandler{}
-
var err error
-
likeStream, err = startJetstreamClient("like_tracker", opts, handler.HandleEvent)
+
err := likeStream.SendOptionsUpdate(opts)
if err != nil {
-
return err
+
logger.Error("couldnt update like stream opts", "error", err)
+
return
}
-
-
return nil
+
logger.Info("updated like stream opts", "requestedDids", len(opts.WantedDIDs))
}
-
func startSubscriberClient() error {
+
func updateSubscriberStreamOpts() {
opts := getSubscriberStreamOpts()
-
if len(opts.WantedDIDs) == 0 {
-
return nil // No subscribers to track
-
}
-
-
handler := &subscriberHandler{}
-
var err error
-
subscriberStream, err = startJetstreamClient("subscriber", opts, handler.HandleEvent)
+
err := subscriberStream.SendOptionsUpdate(opts)
if err != nil {
-
return err
+
logger.Error("couldnt update subscriber stream opts", "error", err)
+
return
}
-
-
return nil
+
logger.Info("updated subscriber stream opts", "userCount", len(opts.WantedDIDs))
}
-
func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent func(context.Context, *models.Event) error) (*client.Client, error) {
-
ctx := context.Background()
-
-
config := client.DefaultClientConfig()
-
config.WebsocketURL = "wss://jetstream.atproto.tools/subscribe"
-
config.Compress = true
-
config.WantedCollections = opts.WantedCollections
-
config.WantedDids = opts.WantedDIDs
-
-
scheduler := sequential.NewScheduler(name, logger, handleEvent)
-
-
c, err := client.NewClient(config, logger, scheduler)
-
if err != nil {
-
logger.Error("Failed to create client", "name", name, "error", err)
-
return nil, err
-
}
-
-
cursor := time.Now().UnixMicro()
-
-
logger.Info("Starting client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs))
-
if err := c.ConnectAndRead(ctx, &cursor); err != nil {
-
logger.Error("Client failed", "name", name, "error", err)
-
return nil, err
-
}
-
-
return c, nil
+
func likeStreamLoop(logger *slog.Logger) {
+
startJetstreamLoop(logger, &likeStream, "like_tracker", HandleLikeEvent, getLikeStreamOpts)
}
-
func getFollowsDids() []string {
-
subscribersMux.RLock()
-
defer subscribersMux.RUnlock()
-
-
var dids []string
-
for _, subscriber := range subscribers {
-
for follow, _ := range subscriber.Follows {
-
dids = append(dids, follow)
-
}
-
}
-
-
return dids
+
func subscriberStreamLoop(logger *slog.Logger) {
+
startJetstreamLoop(logger, &subscriberStream, "subscriber", HandleSubscriberEvent, getSubscriberStreamOpts)
}
-
func getSubscriberDids() []string {
-
subscribersMux.RLock()
-
defer subscribersMux.RUnlock()
-
-
var dids []string
-
for did := range subscribers {
-
dids = append(dids, did)
+
func HandleLikeEvent(ctx context.Context, event *models.Event) error {
+
if event == nil || event.Commit == nil || len(event.Commit.Record) == 0 {
+
return nil
}
-
return dids
-
}
-
-
type likeHandler struct{}
-
-
func (h *likeHandler) HandleEvent(ctx context.Context, event *models.Event) error {
var like bsky.FeedLike
if err := json.Unmarshal(event.Commit.Record, &like); err != nil {
logger.Error("Failed to unmarshal like", "error", err)
···
return nil
}
-
type subscriberHandler struct{}
+
func HandleSubscriberEvent(ctx context.Context, event *models.Event) error {
+
if event == nil || event.Commit == nil {
+
return nil
+
}
-
func (h *subscriberHandler) HandleEvent(ctx context.Context, event *models.Event) error {
switch event.Commit.Collection {
case "app.bsky.feed.repost":
modifySubscribersWithEvent(
···
case "app.bsky.graph.follow":
modifySubscribersWithEvent(
event,
-
func(s *SubscriberData, r bsky.GraphFollow) { delete(s.Follows, r.Subject) },
+
func(s *SubscriberData, r bsky.GraphFollow) { delete(s.ListenTo, r.Subject) },
func(s *SubscriberData, r bsky.GraphFollow) {
-
s.Follows[r.Subject] = struct{}{}
+
s.ListenTo[r.Subject] = struct{}{}
},
)
-
updateLikeStreamOpts()
}
return nil
···
type ModifyFunc[v any] func(*SubscriberData, v)
func modifySubscribersWithEvent[v any](event *models.Event, onDelete ModifyFunc[v], onUpdate ModifyFunc[v]) error {
+
if len(event.Commit.Record) == 0 {
+
return nil
+
}
+
var data v
if err := json.Unmarshal(event.Commit.Record, &data); err != nil {
-
logger.Error("Failed to unmarshal repost", "error", err)
+
logger.Error("Failed to unmarshal repost", "error", err, "raw", event.Commit.Record)
return nil
}
+66
xrpc.go
···
+
package main
+
+
import (
+
"context"
+
"encoding/json"
+
"fmt"
+
+
"github.com/bluesky-social/indigo/api/atproto"
+
"github.com/bluesky-social/indigo/api/bsky"
+
"github.com/bluesky-social/indigo/atproto/identity"
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"github.com/bluesky-social/indigo/xrpc"
+
)
+
+
func findUserPDS(ctx context.Context, did string) (string, error) {
+
id, err := identity.DefaultDirectory().LookupDID(ctx, syntax.DID(did))
+
if err != nil {
+
return "", err
+
}
+
pdsURI := id.PDSEndpoint()
+
if len(pdsURI) == 0 {
+
return "", fmt.Errorf("no PDS URL was found in identity document")
+
}
+
+
return pdsURI, nil
+
}
+
+
func fetchRecords[v any](ctx context.Context, xrpcClient *xrpc.Client, collection, did string, extractFn func(v) string) (Set[string], error) {
+
all := make(Set[string])
+
cursor := ""
+
+
for {
+
// todo: ratelimits?? idk what this does for those
+
out, err := atproto.RepoListRecords(ctx, xrpcClient, collection, cursor, 100, did, false)
+
if err != nil {
+
return nil, err
+
}
+
+
for _, record := range out.Records {
+
raw, _ := record.Value.MarshalJSON()
+
var val v
+
if err := json.Unmarshal(raw, &val); err != nil {
+
return nil, err
+
}
+
s := extractFn(val)
+
if len(s) > 0 {
+
all[s] = struct{}{}
+
}
+
}
+
+
if out.Cursor == nil || *out.Cursor == "" {
+
break
+
}
+
cursor = *out.Cursor
+
}
+
+
return all, nil
+
}
+
+
func fetchReposts(ctx context.Context, xrpcClient *xrpc.Client, did string) (Set[string], error) {
+
return fetchRecords(ctx, xrpcClient, "app.bsky.feed.repost", did, func(v bsky.FeedRepost) string { return v.Subject.Uri })
+
}
+
+
func fetchFollows(ctx context.Context, xrpcClient *xrpc.Client, did string) (Set[string], error) {
+
return fetchRecords(ctx, xrpcClient, "app.bsky.graph.follow", did, func(v bsky.GraphFollow) string { return v.Subject })
+
}