forked from tangled.org/core
this repo has no description

spindle,cmd/spindle: init spindle server

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

anirudh.fi 55ef369a 9d70af5e

verified
Changed files
+345
cmd
spindle
spindle
+14
cmd/spindle/main.go
···
package main
+
import (
+
"context"
+
"os"
+
+
"tangled.sh/tangled.sh/core/log"
+
"tangled.sh/tangled.sh/core/spindle"
+
)
+
func main() {
+
ctx := log.NewContext(context.Background(), "spindle")
+
err := spindle.Run(ctx)
+
if err != nil {
+
log.FromContext(ctx).Error("error running spindle", "error", err)
+
os.Exit(-1)
+
}
}
+28
spindle/config/config.go
···
+
package config
+
+
import (
+
"context"
+
+
"github.com/sethvargo/go-envconfig"
+
)
+
+
type Server struct {
+
ListenAddr string `env:"LISTEN_ADDR, default=0.0.0.0:6555"`
+
DBPath string `env:"DB_PATH, default=spindle.db"`
+
Hostname string `env:"HOSTNAME, required"`
+
JetstreamEndpoint string `env:"JETSTREAM_ENDPOINT, default=wss://jetstream1.us-west.bsky.network/subscribe"`
+
}
+
+
type Config struct {
+
Server Server `env:",prefix=SPINDLE_SERVER_"`
+
}
+
+
func Load(ctx context.Context) (*Config, error) {
+
var cfg Config
+
err := envconfig.Process(ctx, &cfg)
+
if err != nil {
+
return nil, err
+
}
+
+
return &cfg, nil
+
}
+56
spindle/db/db.go
···
+
package db
+
+
import "database/sql"
+
+
type DB struct {
+
*sql.DB
+
}
+
+
func Make(dbPath string) (*DB, error) {
+
db, err := sql.Open("sqlite3", dbPath)
+
if err != nil {
+
return nil, err
+
}
+
+
_, err = db.Exec(`
+
pragma journal_mode = WAL;
+
pragma synchronous = normal;
+
pragma foreign_keys = on;
+
pragma temp_store = memory;
+
pragma mmap_size = 30000000000;
+
pragma page_size = 32768;
+
pragma auto_vacuum = incremental;
+
pragma busy_timeout = 5000;
+
+
create table if not exists known_dids (
+
did text primary key
+
);
+
+
create table if not exists pipelines (
+
rkey text not null,
+
pipeline text not null, -- json
+
primary key rkey
+
);
+
`)
+
if err != nil {
+
return nil, err
+
}
+
+
return &DB{db}, nil
+
}
+
+
func (d *DB) SaveLastTimeUs(lastTimeUs int64) error {
+
_, err := d.Exec(`
+
insert into _jetstream (id, last_time_us)
+
values (1, ?)
+
on conflict(id) do update set last_time_us = excluded.last_time_us
+
`, lastTimeUs)
+
return err
+
}
+
+
func (d *DB) GetLastTimeUs() (int64, error) {
+
var lastTimeUs int64
+
row := d.QueryRow(`select last_time_us from _jetstream where id = 1;`)
+
err := row.Scan(&lastTimeUs)
+
return lastTimeUs, err
+
}
+83
spindle/db/pipelines.go
···
+
package db
+
+
import (
+
"fmt"
+
)
+
+
type Pipeline struct {
+
Rkey string
+
PipelineJson string
+
}
+
+
func (d *DB) InsertPipeline(pipeline Pipeline) error {
+
_, err := d.Exec(
+
`insert into pipelines (rkey, nsid, event) values (?, ?, ?)`,
+
pipeline.Rkey,
+
pipeline.PipelineJson,
+
)
+
+
return err
+
}
+
+
func (d *DB) GetPipeline(rkey, cursor string) (Pipeline, error) {
+
whereClause := "where rkey = ?"
+
args := []any{rkey}
+
+
if cursor != "" {
+
whereClause += " and rkey > ?"
+
args = append(args, cursor)
+
}
+
+
query := fmt.Sprintf(`
+
select rkey, pipeline
+
from pipelines
+
%s
+
limit 1
+
`, whereClause)
+
+
row := d.QueryRow(query, args...)
+
+
var p Pipeline
+
err := row.Scan(&p.Rkey, &p.PipelineJson)
+
if err != nil {
+
return Pipeline{}, err
+
}
+
+
return p, nil
+
}
+
+
func (d *DB) GetPipelines(cursor string) ([]Pipeline, error) {
+
whereClause := ""
+
args := []any{}
+
if cursor != "" {
+
whereClause = "where rkey > ?"
+
args = append(args, cursor)
+
}
+
+
query := fmt.Sprintf(`
+
select rkey, nsid, pipeline
+
from pipelines
+
%s
+
order by rkey asc
+
limit 100
+
`, whereClause)
+
+
rows, err := d.Query(query, args...)
+
if err != nil {
+
return nil, err
+
}
+
defer rows.Close()
+
+
var evts []Pipeline
+
for rows.Next() {
+
var ev Pipeline
+
rows.Scan(&ev.Rkey, &ev.PipelineJson)
+
evts = append(evts, ev)
+
}
+
+
if err := rows.Err(); err != nil {
+
return nil, err
+
}
+
+
return evts, nil
+
}
+71
spindle/server.go
···
+
package spindle
+
+
import (
+
"fmt"
+
"log/slog"
+
"net/http"
+
+
"golang.org/x/net/context"
+
"tangled.sh/tangled.sh/core/api/tangled"
+
"tangled.sh/tangled.sh/core/jetstream"
+
"tangled.sh/tangled.sh/core/knotserver/notifier"
+
"tangled.sh/tangled.sh/core/log"
+
"tangled.sh/tangled.sh/core/rbac"
+
"tangled.sh/tangled.sh/core/spindle/config"
+
"tangled.sh/tangled.sh/core/spindle/db"
+
)
+
+
type Spindle struct {
+
jc *jetstream.JetstreamClient
+
db *db.DB
+
e *rbac.Enforcer
+
l *slog.Logger
+
n *notifier.Notifier
+
}
+
+
func Run(ctx context.Context) error {
+
cfg, err := config.Load(ctx)
+
if err != nil {
+
return fmt.Errorf("failed to load config: %w", err)
+
}
+
+
d, err := db.Make(cfg.Server.DBPath)
+
if err != nil {
+
return fmt.Errorf("failed to setup db: %w", err)
+
}
+
+
e, err := rbac.NewEnforcer(cfg.Server.DBPath)
+
if err != nil {
+
return fmt.Errorf("failed to setup rbac enforcer: %w", err)
+
}
+
+
logger := log.FromContext(ctx)
+
+
collections := []string{tangled.SpindleMemberNSID}
+
jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, logger, d, true, false)
+
if err != nil {
+
return fmt.Errorf("failed to setup jetstream client: %w", err)
+
}
+
+
n := notifier.New()
+
+
spindle := Spindle{
+
jc: jc,
+
e: e,
+
db: d,
+
l: logger,
+
n: &n,
+
}
+
+
logger.Info("starting spindle server", "address", cfg.Server.ListenAddr)
+
logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router()))
+
+
return nil
+
}
+
+
func (s *Spindle) Router() http.Handler {
+
mux := &http.ServeMux{}
+
+
mux.HandleFunc("/events", s.Events)
+
return mux
+
}
+93
spindle/stream.go
···
+
package spindle
+
+
import (
+
"net/http"
+
"time"
+
+
"github.com/gorilla/websocket"
+
"golang.org/x/net/context"
+
)
+
+
var upgrader = websocket.Upgrader{
+
ReadBufferSize: 1024,
+
WriteBufferSize: 1024,
+
}
+
+
func (s *Spindle) Events(w http.ResponseWriter, r *http.Request) {
+
l := s.l.With("handler", "Events")
+
l.Info("received new connection")
+
+
conn, err := upgrader.Upgrade(w, r, nil)
+
if err != nil {
+
l.Error("websocket upgrade failed", "err", err)
+
w.WriteHeader(http.StatusInternalServerError)
+
return
+
}
+
defer conn.Close()
+
l.Info("upgraded http to wss")
+
+
ch := s.n.Subscribe()
+
defer s.n.Unsubscribe(ch)
+
+
ctx, cancel := context.WithCancel(r.Context())
+
defer cancel()
+
go func() {
+
for {
+
if _, _, err := conn.NextReader(); err != nil {
+
l.Error("failed to read", "err", err)
+
cancel()
+
return
+
}
+
}
+
}()
+
+
cursor := ""
+
+
// complete backfill first before going to live data
+
l.Info("going through backfill", "cursor", cursor)
+
if err := s.streamPipelines(conn, &cursor); err != nil {
+
l.Error("failed to backfill", "err", err)
+
return
+
}
+
+
for {
+
// wait for new data or timeout
+
select {
+
case <-ctx.Done():
+
l.Info("stopping stream: client closed connection")
+
return
+
case <-ch:
+
// we have been notified of new data
+
l.Info("going through live data", "cursor", cursor)
+
if err := s.streamPipelines(conn, &cursor); err != nil {
+
l.Error("failed to stream", "err", err)
+
return
+
}
+
case <-time.After(30 * time.Second):
+
// send a keep-alive
+
l.Info("sent keepalive")
+
if err = conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second)); err != nil {
+
l.Error("failed to write control", "err", err)
+
}
+
}
+
}
+
}
+
+
func (s *Spindle) streamPipelines(conn *websocket.Conn, cursor *string) error {
+
ops, err := s.db.GetPipelines(*cursor)
+
if err != nil {
+
s.l.Debug("err", "err", err)
+
return err
+
}
+
s.l.Debug("ops", "ops", ops)
+
+
for _, op := range ops {
+
if err := conn.WriteJSON(op); err != nil {
+
s.l.Debug("err", "err", err)
+
return err
+
}
+
*cursor = op.Rkey
+
}
+
+
return nil
+
}