forked from tangled.org/core
this repo has no description

jestream: allow caller to optionally waitForDid

This is handy because the knotserver only subscribes to specific dids,
and we don't want jestream to run without (which defaults to all dids).
The oppsite is true for the appview which wants to see all dids.

Also adds some nil checks to ensure it doesn't break when event is not a
Commit.

Changed files
+25 -9
appview
state
cmd
knotserver
jetstream
knotserver
+7 -2
appview/state/state.go
···
resolver := appview.NewResolver()
-
jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, db)
if err != nil {
return nil, fmt.Errorf("failed to create jetstream client: %w", err)
}
err = jc.StartJetstream(context.Background(), func(ctx context.Context, e *models.Event) error {
did := e.Did
-
raw := e.Commit.Record
switch e.Commit.Collection {
case tangled.GraphFollowNSID:
···
resolver := appview.NewResolver()
+
jc, err := jetstream.NewJetstreamClient("appview", []string{tangled.GraphFollowNSID}, nil, db, false)
if err != nil {
return nil, fmt.Errorf("failed to create jetstream client: %w", err)
}
err = jc.StartJetstream(context.Background(), func(ctx context.Context, e *models.Event) error {
+
if e.Kind != models.EventKindCommit {
+
return nil
+
}
+
did := e.Did
+
fmt.Println("got event", e.Commit.Collection, e.Commit.RKey, e.Commit.Record)
+
raw := json.RawMessage(e.Commit.Record)
switch e.Commit.Collection {
case tangled.GraphFollowNSID:
+1 -1
cmd/knotserver/main.go
···
jc, err := jetstream.NewJetstreamClient("knotserver", []string{
tangled.PublicKeyNSID,
tangled.KnotMemberNSID,
-
}, nil, db)
if err != nil {
l.Error("failed to setup jetstream", "error", err)
}
···
jc, err := jetstream.NewJetstreamClient("knotserver", []string{
tangled.PublicKeyNSID,
tangled.KnotMemberNSID,
+
}, nil, db, true)
if err != nil {
l.Error("failed to setup jetstream", "error", err)
}
+14 -6
jetstream/jetstream.go
···
db DB
reconnectCh chan struct{}
mu sync.RWMutex
}
···
j.reconnectCh <- struct{}{}
}
-
func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB) (*JetstreamClient, error) {
if cfg == nil {
cfg = client.DefaultClientConfig()
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
···
}
return &JetstreamClient{
-
cfg: cfg,
-
ident: ident,
-
db: db,
reconnectCh: make(chan struct{}, 1),
}, nil
}
···
go func() {
lastTimeUs := j.getLastTimeUs(ctx)
-
for len(j.cfg.WantedDids) == 0 {
-
time.Sleep(time.Second)
}
j.connectAndRead(ctx, &lastTimeUs)
}()
···
db DB
reconnectCh chan struct{}
+
waitForDid bool
mu sync.RWMutex
}
···
j.reconnectCh <- struct{}{}
}
+
func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) {
if cfg == nil {
cfg = client.DefaultClientConfig()
cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
···
}
return &JetstreamClient{
+
cfg: cfg,
+
ident: ident,
+
db: db,
+
+
// This will make the goroutine in StartJetstream wait until
+
// cfg.WantedDids has been populated, typically using UpdateDids.
+
waitForDid: waitForDid,
reconnectCh: make(chan struct{}, 1),
}, nil
}
···
go func() {
lastTimeUs := j.getLastTimeUs(ctx)
+
if j.waitForDid {
+
for len(j.cfg.WantedDids) == 0 {
+
time.Sleep(time.Second)
+
}
}
+
logger.Info("done waiting for did")
j.connectAndRead(ctx, &lastTimeUs)
}()
+3
knotserver/jetstream.go
···
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
did := event.Did
raw := json.RawMessage(event.Commit.Record)
···
func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
did := event.Did
+
if event.Kind != models.EventKindCommit {
+
return nil
+
}
raw := json.RawMessage(event.Commit.Record)