forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

knotserver/jetstream: start jetstream watcher from last time_us

If last time_us is older than a week, we discard it and start over.
This also refreshes the saved last time_us in the database.

anirudh.fi 05f2348d 3876c9e7

verified
Changed files
+48 -11
knotserver
+4 -7
knotserver/db/init.go
···
create table if not exists known_dids (
did text primary key
);
+
create table if not exists public_keys (
id integer primary key autoincrement,
did text not null,
···
created timestamp default current_timestamp,
unique(did, name)
);
-
create table if not exists access_levels (
+
+
create table if not exists _jetstream (
id integer primary key autoincrement,
-
repo_id integer not null,
-
did text not null,
-
access text not null check (access in ('OWNER', 'WRITER')),
-
created timestamp default current_timestamp,
-
unique(repo_id, did),
-
foreign key (repo_id) references repos(id) on delete cascade
+
last_time_us integer not null
);
`)
if err != nil {
+13
knotserver/db/jetstream.go
···
+
package db
+
+
func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
+
_, err := d.db.Exec(`insert into _jetstream (last_time_us) values (?)`, lastTimeUs)
+
return err
+
}
+
+
func (d *DB) GetLastTimeUs() (int64, error) {
+
var lastTimeUs int64
+
row := d.db.QueryRow(`select last_time_us from _jetstream`)
+
err := row.Scan(&lastTimeUs)
+
return lastTimeUs, err
+
}
+26 -1
knotserver/handler.go
···
"fmt"
"log"
"net/http"
+
"time"
"github.com/go-chi/chi/v5"
tangled "github.com/sotangled/tangled/api/tangled"
···
collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
dids := []string{}
+
var lastTimeUs int64
+
var err error
+
lastTimeUs, err = h.db.GetLastTimeUs()
+
if err != nil {
+
log.Println("couldn't get last time us, starting from now")
+
lastTimeUs = time.Now().UnixMicro()
+
}
+
// If last time is older than a week, start from now
+
if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
+
lastTimeUs = time.Now().UnixMicro()
+
log.Printf("last time us is older than a week. discarding that and starting from now.")
+
err = h.db.SaveLastTimeUs(lastTimeUs)
+
if err != nil {
+
log.Println("failed to save last time us")
+
}
+
}
+
+
log.Printf("found last time_us %d", lastTimeUs)
+
h.js = jsclient.NewJetstreamClient(collections, dids)
-
messages, err := h.js.ReadJetstream(ctx)
+
messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
if err != nil {
return fmt.Errorf("failed to read from jetstream: %w", err)
}
···
h.e.AddMember(ThisServer, record["member"].(string))
}
default:
+
}
+
+
lastTimeUs := int64(data["time_us"].(float64))
+
if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
+
log.Printf("failed to save last time us: %v", err)
}
}
+5 -3
knotserver/jsclient/jetstream.go
···
}
}
-
func (j *JetstreamClient) ReadJetstream(ctx context.Context) (chan []byte, error) {
-
fiveSecondsAgo := time.Now().Add(-5 * time.Second).UnixMicro()
+
func (j *JetstreamClient) ReadJetstream(ctx context.Context, lastTimestamp int64) (chan []byte, error) {
+
if lastTimestamp == 0 {
+
lastTimestamp = time.Now().Add(-5 * time.Second).UnixMicro()
+
}
-
if err := j.connect(fiveSecondsAgo); err != nil {
+
if err := j.connect(lastTimestamp); err != nil {
log.Printf("error connecting to jetstream: %v", err)
return nil, err
}