forked from tangled.org/core
this repo has no description

knotserver: switch to using official jetstream pkg

Changed files
+161 -278
knotserver
+13 -8
go.mod
···
github.com/Blank-Xu/sql-adapter v1.1.1
github.com/bluekeyes/go-gitdiff v0.8.0
github.com/bluesky-social/indigo v0.0.0-20250123072624-9e3b84fdbb20
+
github.com/bluesky-social/jetstream v0.0.0-20241210005130-ea96859b93d1
github.com/casbin/casbin/v2 v2.103.0
github.com/gliderlabs/ssh v0.3.5
github.com/go-chi/chi/v5 v5.2.0
···
github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
github.com/carlmjohnson/versioninfo v0.22.5 // indirect
github.com/casbin/govaluate v1.3.0 // indirect
-
github.com/cespare/xxhash/v2 v2.2.0 // indirect
+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.4.0 // indirect
github.com/cyphar/filepath-securejoin v0.3.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
···
github.com/go-git/go-billy/v5 v5.5.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
+
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gorilla/css v1.0.1 // indirect
github.com/gorilla/securecookie v1.1.2 // indirect
···
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
+
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
-
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
···
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect
-
github.com/prometheus/client_golang v1.17.0 // indirect
-
github.com/prometheus/client_model v0.5.0 // indirect
-
github.com/prometheus/common v0.45.0 // indirect
-
github.com/prometheus/procfs v0.12.0 // indirect
+
github.com/prometheus/client_golang v1.19.1 // indirect
+
github.com/prometheus/client_model v0.6.1 // indirect
+
github.com/prometheus/common v0.54.0 // indirect
+
github.com/prometheus/procfs v0.15.1 // indirect
github.com/sergi/go-diff v1.3.1 // indirect
github.com/skeema/knownhosts v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
···
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.29.0 // indirect
-
golang.org/x/time v0.3.0 // indirect
-
google.golang.org/protobuf v1.33.0 // indirect
+
golang.org/x/time v0.5.0 // indirect
+
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
···
replace github.com/sergi/go-diff => github.com/sergi/go-diff v1.1.0
replace github.com/go-git/go-git/v5 => github.com/go-git/go-git/v5 v5.6.1
+
+
// from bluesky-social/indigo
+
replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4
+22 -18
go.sum
···
github.com/bluekeyes/go-gitdiff v0.8.0/go.mod h1:WWAk1Mc6EgWarCrPFO+xeYlujPu98VuLW3Tu+B/85AE=
github.com/bluesky-social/indigo v0.0.0-20250123072624-9e3b84fdbb20 h1:yHusfYYi8odoCcsI6AurU+dRWb7itHAQNwt3/Rl9Vfs=
github.com/bluesky-social/indigo v0.0.0-20250123072624-9e3b84fdbb20/go.mod h1:Qp4YqWf+AQ3TwQCxV5Ls8O2tXE55zVTGVs3zTmn7BOg=
+
github.com/bluesky-social/jetstream v0.0.0-20241210005130-ea96859b93d1 h1:CFvRtYNSnWRAi/98M3O466t9dYuwtesNbu6FVPymRrA=
+
github.com/bluesky-social/jetstream v0.0.0-20241210005130-ea96859b93d1/go.mod h1:WiYEeyJSdUwqoaZ71KJSpTblemUCpwJfh5oVXplK6T4=
github.com/bmatcuk/doublestar/v4 v4.6.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
github.com/bmatcuk/doublestar/v4 v4.7.1 h1:fdDeAqgT47acgwd9bd9HxJRDmc9UAmPpc+2m0CXv75Q=
github.com/bmatcuk/doublestar/v4 v4.7.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc=
···
github.com/casbin/govaluate v1.2.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A=
github.com/casbin/govaluate v1.3.0 h1:VA0eSY0M2lA86dYd5kPPuNZMUD9QkWnOCnavGrw9myc=
github.com/casbin/govaluate v1.3.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A=
-
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
-
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cloudflare/circl v1.1.0/go.mod h1:prBCrKB9DV4poKZY1l9zBXg2QJY7mvgRvtMxxK7fi4I=
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/cloudflare/circl v1.4.0 h1:BV7h5MgrktNzytKmWjpOtdYrf0lkkbF8YMlBGPhJQrY=
···
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
+
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
+
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
···
github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
+
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
+
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
···
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
-
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
-
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwXFM08ygZfk=
github.com/microcosm-cc/bluemonday v1.0.27/go.mod h1:jFi9vgW+H7c3V0lb6nR74Ib/DIB5OBs92Dimizgw2cA=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
···
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f h1:VXTQfuJj9vKR4TCkEuWIckKvdHFeJH/huIFJ9/cXOB0=
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw=
-
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
-
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
-
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
-
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
-
github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
-
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
-
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
-
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
+
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
+
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
+
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
+
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
+
github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8=
+
github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ=
+
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
+
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
-
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
-
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
+
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
+
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
···
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
-
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
-
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
+
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
+
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
···
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU=
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
-
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
-
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
+
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
+
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+2 -3
knotserver/handler.go
···
"github.com/go-chi/chi/v5"
"github.com/sotangled/tangled/knotserver/config"
"github.com/sotangled/tangled/knotserver/db"
-
"github.com/sotangled/tangled/knotserver/jsclient"
"github.com/sotangled/tangled/rbac"
)
···
type Handle struct {
c *config.Config
db *db.DB
-
js *jsclient.JetstreamClient
+
jc *JetstreamClient
e *rbac.Enforcer
l *slog.Logger
···
if len(dids) > 0 {
h.knotInitialized = true
close(h.init)
-
h.js.UpdateDids(dids)
+
h.jc.UpdateDids(dids)
}
r.Get("/", h.Index)
+121 -76
knotserver/jetstream.go
···
"net/http"
"net/url"
"strings"
+
"sync"
"time"
+
"github.com/bluesky-social/jetstream/pkg/client"
+
"github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
+
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/sotangled/tangled/api/tangled"
"github.com/sotangled/tangled/knotserver/db"
-
"github.com/sotangled/tangled/knotserver/jsclient"
"github.com/sotangled/tangled/log"
)
+
type JetstreamClient struct {
+
cfg *client.ClientConfig
+
client *client.Client
+
reconnectCh chan struct{}
+
mu sync.RWMutex
+
}
+
func (h *Handle) StartJetstream(ctx context.Context) error {
l := h.l.With("component", "jetstream")
ctx = log.IntoContext(ctx, l)
···
return err
}
-
h.js = jsclient.NewJetstreamClient(collections, dids)
-
messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
+
cfg := client.DefaultClientConfig()
+
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
+
cfg.WantedCollections = collections
+
cfg.WantedDids = dids
+
+
sched := sequential.NewScheduler("knotserver", l, h.processMessages)
+
+
client, err := client.NewClient(cfg, l, sched)
if err != nil {
-
return fmt.Errorf("failed to read from jetstream: %w", err)
+
l.Error("failed to create jetstream client", "error", err)
}
-
go h.processMessages(ctx, messages)
+
jc := &JetstreamClient{
+
cfg: cfg,
+
client: client,
+
reconnectCh: make(chan struct{}),
+
}
+
h.jc = jc
+
+
go func() {
+
for len(h.jc.cfg.WantedDids) == 0 {
+
time.Sleep(time.Second)
+
}
+
h.connectAndRead(ctx, &lastTimeUs)
+
}()
return nil
}
+
func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) {
+
l := log.FromContext(ctx)
+
for {
+
select {
+
case <-h.jc.reconnectCh:
+
l.Info("reconnecting jetstream client")
+
h.jc.client.Scheduler.Shutdown()
+
if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
+
l.Error("error reading jetstream", "error", err)
+
}
+
default:
+
if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
+
l.Error("error reading jetstream", "error", err)
+
}
+
}
+
}
+
}
+
+
func (j *JetstreamClient) UpdateDids(dids []string) {
+
j.mu.Lock()
+
j.cfg.WantedDids = dids
+
j.mu.Unlock()
+
j.reconnectCh <- struct{}{}
+
}
+
func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) {
l := log.FromContext(ctx)
lastTimeUs, err := h.db.GetLastTimeUs()
···
return lastTimeUs, nil
}
-
func (h *Handle) processPublicKey(ctx context.Context, did string, record map[string]interface{}) error {
+
func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
l := log.FromContext(ctx)
-
if err := h.db.AddPublicKeyFromRecord(did, record); err != nil {
+
pk := db.PublicKey{
+
Did: did,
+
PublicKey: record,
+
}
+
if err := h.db.AddPublicKey(pk); err != nil {
l.Error("failed to add public key", "error", err)
return fmt.Errorf("failed to add public key: %w", err)
}
···
return nil
}
+
func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
+
l := log.FromContext(ctx)
+
+
if record.Domain != h.c.Server.Hostname {
+
l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
+
return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
+
}
+
+
ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
+
if err != nil || !ok {
+
l.Error("failed to add member", "did", did)
+
return fmt.Errorf("failed to enforce permissions: %w", err)
+
}
+
+
l.Info("adding member")
+
if err := h.e.AddMember(ThisServer, record.Member); err != nil {
+
l.Error("failed to add member", "error", err)
+
return fmt.Errorf("failed to add member: %w", err)
+
}
+
l.Info("added member from firehose", "member", record.Member)
+
+
if err := h.db.AddDid(did); err != nil {
+
l.Error("failed to add did", "error", err)
+
return fmt.Errorf("failed to add did: %w", err)
+
}
+
+
if err := h.fetchAndAddKeys(ctx, did); err != nil {
+
return fmt.Errorf("failed to fetch and add keys: %w", err)
+
}
+
+
h.jc.UpdateDids([]string{did})
+
return nil
+
}
+
func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
l := log.FromContext(ctx)
···
defer resp.Body.Close()
if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") {
-
l.Error("unexpected content type", "content-type", ct)
return fmt.Errorf("unexpected content type: %s", ct)
}
···
return nil
}
-
func (h *Handle) processKnotMember(ctx context.Context, did string, record map[string]interface{}) error {
-
l := log.FromContext(ctx)
+
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
+
did := event.Did
-
if record["domain"] != h.c.Server.Hostname {
-
l.Error("domain mismatch", "domain", record["domain"], "expected", h.c.Server.Hostname)
-
return fmt.Errorf("domain mismatch: %s != %s", record["domain"], h.c.Server.Hostname)
-
}
+
raw := json.RawMessage(event.Commit.Record)
-
ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
-
if err != nil || !ok {
-
l.Error("failed to add member", "did", did)
-
return fmt.Errorf("failed to enforce permissions: %w", err)
-
}
-
-
l.Info("adding member")
-
if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil {
-
l.Error("failed to add member", "error", err)
-
return fmt.Errorf("failed to add member: %w", err)
-
}
-
l.Info("added member from firehose", "member", record["member"])
+
switch event.Commit.Collection {
+
case tangled.PublicKeyNSID:
+
var record tangled.PublicKey
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
+
if err := h.processPublicKey(ctx, did, record); err != nil {
+
return fmt.Errorf("failed to process public key: %w", err)
+
}
-
if err := h.db.AddDid(did); err != nil {
-
l.Error("failed to add did", "error", err)
-
return fmt.Errorf("failed to add did: %w", err)
+
case tangled.KnotMemberNSID:
+
var record tangled.KnotMember
+
if err := json.Unmarshal(raw, &record); err != nil {
+
return fmt.Errorf("failed to unmarshal record: %w", err)
+
}
+
if err := h.processKnotMember(ctx, did, record); err != nil {
+
return fmt.Errorf("failed to process knot member: %w", err)
+
}
}
-
if err := h.fetchAndAddKeys(ctx, did); err != nil {
-
return fmt.Errorf("failed to fetch and add keys: %w", err)
+
lastTimeUs := event.TimeUS
+
if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
+
return fmt.Errorf("failed to save last time us: %w", err)
}
-
h.js.UpdateDids([]string{did})
return nil
}
-
-
func (h *Handle) processMessages(ctx context.Context, messages <-chan []byte) {
-
l := log.FromContext(ctx)
-
l.Info("waiting for knot to be initialized")
-
<-h.init
-
l.Info("initialized jetstream watcher")
-
-
for msg := range messages {
-
var data map[string]interface{}
-
if err := json.Unmarshal(msg, &data); err != nil {
-
l.Error("error unmarshaling message", "error", err)
-
continue
-
}
-
-
if kind, ok := data["kind"].(string); ok && kind == "commit" {
-
commit := data["commit"].(map[string]interface{})
-
did := data["did"].(string)
-
record := commit["record"].(map[string]interface{})
-
-
var processErr error
-
switch commit["collection"].(string) {
-
case tangled.PublicKeyNSID:
-
if err := h.processPublicKey(ctx, did, record); err != nil {
-
processErr = fmt.Errorf("failed to process public key: %w", err)
-
}
-
case tangled.KnotMemberNSID:
-
if err := h.processKnotMember(ctx, did, record); err != nil {
-
processErr = fmt.Errorf("failed to process knot member: %w", err)
-
}
-
}
-
-
if processErr != nil {
-
l.Error("error processing message", "error", processErr)
-
continue
-
}
-
-
lastTimeUs := int64(data["time_us"].(float64))
-
if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
-
l.Error("failed to save last time us", "error", err)
-
continue
-
}
-
}
-
}
-
}
-170
knotserver/jsclient/jetstream.go
···
-
package jsclient
-
-
import (
-
"context"
-
"fmt"
-
"log"
-
"net/url"
-
"sync"
-
"time"
-
-
"github.com/gorilla/websocket"
-
)
-
-
type JetstreamClient struct {
-
collections []string
-
dids []string
-
conn *websocket.Conn
-
mu sync.RWMutex
-
reconnectCh chan struct{}
-
}
-
-
func NewJetstreamClient(collections, dids []string) *JetstreamClient {
-
return &JetstreamClient{
-
collections: collections,
-
dids: dids,
-
reconnectCh: make(chan struct{}, 1),
-
}
-
}
-
-
func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL {
-
-
u := url.URL{
-
Scheme: "wss",
-
Host: "jetstream1.us-west.bsky.network",
-
Path: "/subscribe",
-
RawQuery: queryParams,
-
}
-
-
return u
-
}
-
-
// UpdateCollections updates the collections list and triggers a reconnection
-
func (j *JetstreamClient) UpdateCollections(collections []string) {
-
j.mu.Lock()
-
j.collections = collections
-
j.mu.Unlock()
-
j.triggerReconnect()
-
}
-
-
// UpdateDids updates the Dids list and triggers a reconnection
-
func (j *JetstreamClient) UpdateDids(dids []string) {
-
j.mu.Lock()
-
j.dids = dids
-
j.mu.Unlock()
-
j.triggerReconnect()
-
}
-
-
// Adds one did to the did list
-
func (j *JetstreamClient) AddDid(did string) {
-
j.mu.Lock()
-
j.dids = append(j.dids, did)
-
j.mu.Unlock()
-
j.triggerReconnect()
-
}
-
-
func (j *JetstreamClient) triggerReconnect() {
-
select {
-
case j.reconnectCh <- struct{}{}:
-
default:
-
// Channel already has a pending reconnect
-
}
-
}
-
-
func (j *JetstreamClient) buildQueryParams(cursor int64) string {
-
j.mu.RLock()
-
defer j.mu.RUnlock()
-
-
var collections, dids string
-
if len(j.collections) > 0 {
-
collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor)
-
for _, collection := range j.collections[1:] {
-
collections += fmt.Sprintf("&wantedCollections=%s", collection)
-
}
-
}
-
if len(j.dids) > 0 {
-
for i, did := range j.dids {
-
if i == 0 {
-
dids = fmt.Sprintf("wantedDids=%s", did)
-
} else {
-
dids += fmt.Sprintf("&wantedDids=%s", did)
-
}
-
}
-
}
-
-
var queryStr string
-
if collections != "" && dids != "" {
-
queryStr = collections + "&" + dids
-
} else if collections != "" {
-
queryStr = collections
-
} else if dids != "" {
-
queryStr = dids
-
}
-
-
return queryStr
-
}
-
-
func (j *JetstreamClient) connect(cursor int64) error {
-
queryParams := j.buildQueryParams(cursor)
-
u := j.buildWebsocketURL(queryParams)
-
-
dialer := websocket.Dialer{
-
HandshakeTimeout: 10 * time.Second,
-
}
-
-
conn, _, err := dialer.Dial(u.String(), nil)
-
if err != nil {
-
return err
-
}
-
-
if j.conn != nil {
-
j.conn.Close()
-
}
-
j.conn = conn
-
return nil
-
}
-
-
func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) {
-
defer close(messages)
-
defer j.conn.Close()
-
-
ticker := time.NewTicker(1 * time.Second)
-
defer ticker.Stop()
-
-
for {
-
select {
-
case <-ctx.Done():
-
return
-
case <-j.reconnectCh:
-
// Reconnect with new parameters
-
cursor := time.Now().Add(-5 * time.Second).UnixMicro()
-
if err := j.connect(cursor); err != nil {
-
log.Printf("error reconnecting to jetstream: %v", err)
-
return
-
}
-
case <-ticker.C:
-
_, message, err := j.conn.ReadMessage()
-
if err != nil {
-
log.Printf("error reading from websocket: %v", err)
-
return
-
}
-
messages <- message
-
}
-
}
-
}
-
-
func (j *JetstreamClient) ReadJetstream(ctx context.Context, lastTimestamp int64) (chan []byte, error) {
-
if lastTimestamp == 0 {
-
lastTimestamp = time.Now().Add(-5 * time.Second).UnixMicro()
-
}
-
-
if err := j.connect(lastTimestamp); err != nil {
-
log.Printf("error connecting to jetstream: %v", err)
-
return nil, err
-
}
-
-
messages := make(chan []byte)
-
go j.readMessages(ctx, messages)
-
-
return messages, nil
-
}
+3 -3
knotserver/routes.go
···
return
}
-
h.js.UpdateDids([]string{did})
+
h.jc.UpdateDids([]string{did})
if err := h.e.AddMember(ThisServer, did); err != nil {
l.Error("adding member", "error", err.Error())
writeError(w, err.Error(), http.StatusInternalServerError)
···
writeError(w, err.Error(), http.StatusInternalServerError)
return
}
-
h.js.UpdateDids([]string{data.Did})
+
h.jc.UpdateDids([]string{data.Did})
repoName := filepath.Join(ownerDid, repo)
if err := h.e.AddRepo(data.Did, ThisServer, repoName); err != nil {
···
return
}
-
h.js.UpdateDids([]string{data.Did})
+
h.jc.UpdateDids([]string{data.Did})
if err := h.e.AddOwner(ThisServer, data.Did); err != nil {
l.Error("adding owner", "error", err.Error())
writeError(w, err.Error(), http.StatusInternalServerError)