appview: introduce knotstream consumer #239

merged
opened by oppi.li targeting master from push-qoplqnlvlqqo

similar to jetstream consumer, we now ingest events from every known knot.

Signed-off-by: oppiliappan me@oppi.li

Changed files
+119 -9
appview
+18 -8
appview/config/config.go
···
"context"
"fmt"
"net/url"
"github.com/sethvargo/go-envconfig"
)
···
Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"`
}
type ResendConfig struct {
ApiKey string `env:"API_KEY"`
SentFrom string `env:"SENT_FROM, default=noreply@notifs.tangled.sh"`
···
}
type Config struct {
-
Core CoreConfig `env:",prefix=TANGLED_"`
-
Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"`
-
Resend ResendConfig `env:",prefix=TANGLED_RESEND_"`
-
Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"`
-
Camo CamoConfig `env:",prefix=TANGLED_CAMO_"`
-
Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"`
-
OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"`
-
Redis RedisConfig `env:",prefix=TANGLED_REDIS_"`
}
func LoadConfig(ctx context.Context) (*Config, error) {
···
"context"
"fmt"
"net/url"
+
"time"
"github.com/sethvargo/go-envconfig"
)
···
Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"`
}
+
type KnotstreamConfig struct {
+
RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"`
+
MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"`
+
ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"`
+
WorkerCount int `env:"WORKER_COUNT, default=64"`
+
QueueSize int `env:"QUEUE_SIZE, default=100"`
+
}
+
type ResendConfig struct {
ApiKey string `env:"API_KEY"`
SentFrom string `env:"SENT_FROM, default=noreply@notifs.tangled.sh"`
···
}
type Config struct {
+
Core CoreConfig `env:",prefix=TANGLED_"`
+
Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"`
+
Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"`
+
Resend ResendConfig `env:",prefix=TANGLED_RESEND_"`
+
Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"`
+
Camo CamoConfig `env:",prefix=TANGLED_CAMO_"`
+
Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"`
+
OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"`
+
Redis RedisConfig `env:",prefix=TANGLED_REDIS_"`
}
func LoadConfig(ctx context.Context) (*Config, error) {
+89
appview/state/knotstream.go
···
···
+
package state
+
+
import (
+
"encoding/json"
+
"fmt"
+
"slices"
+
"time"
+
+
"tangled.sh/tangled.sh/core/api/tangled"
+
"tangled.sh/tangled.sh/core/appview/cache"
+
"tangled.sh/tangled.sh/core/appview/config"
+
"tangled.sh/tangled.sh/core/appview/db"
+
kc "tangled.sh/tangled.sh/core/knotclient"
+
"tangled.sh/tangled.sh/core/log"
+
"tangled.sh/tangled.sh/core/rbac"
+
)
+
+
func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) {
+
knots, err := db.GetCompletedRegistrations(d)
+
if err != nil {
+
return nil, err
+
}
+
+
srcs := make(map[kc.EventSource]struct{})
+
for _, k := range knots {
+
s := kc.EventSource{k}
+
srcs[s] = struct{}{}
+
}
+
+
logger := log.New("knotstream")
+
cache := cache.New(c.Redis.Addr)
+
cursorStore := kc.NewRedisCursorStore(cache)
+
+
cfg := kc.ConsumerConfig{
+
Sources: srcs,
+
ProcessFunc: knotstreamIngester(d, enforcer),
+
RetryInterval: c.Knotstream.RetryInterval,
+
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
+
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
+
WorkerCount: c.Knotstream.WorkerCount,
+
QueueSize: c.Knotstream.QueueSize,
+
Logger: logger,
+
Dev: c.Core.Dev,
+
CursorStore: &cursorStore,
+
}
+
+
return kc.NewEventConsumer(cfg), nil
+
}
+
+
func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc {
+
return func(source kc.EventSource, msg kc.Message) error {
+
switch msg.Nsid {
+
case tangled.GitRefUpdateNSID:
+
return ingestRefUpdate(d, enforcer, source, msg)
+
case tangled.PipelineNSID:
+
// TODO
+
}
+
+
return nil
+
}
+
}
+
+
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error {
+
var record tangled.GitRefUpdate
+
err := json.Unmarshal(msg.EventJson, &record)
+
if err != nil {
+
return err
+
}
+
+
knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid)
+
if err != nil {
+
return err
+
}
+
+
if !slices.Contains(knownKnots, source.Knot) {
+
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
+
}
+
+
punch := db.Punch{
+
Did: record.CommitterDid,
+
Date: time.Now(),
+
Count: 1,
+
}
+
if err := db.AddPunch(d, punch); err != nil {
+
return err
+
}
+
+
return nil
+
}
+11
appview/state/state.go
···
jc *jetstream.JetstreamClient
config *config.Config
repoResolver *reporesolver.RepoResolver
}
func Make(config *config.Config) (*State, error) {
···
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
}
state := &State{
d,
oauth,
···
jc,
config,
repoResolver,
}
return state, nil
···
return
}
w.Write([]byte("check success"))
}
···
jc *jetstream.JetstreamClient
config *config.Config
repoResolver *reporesolver.RepoResolver
+
knotstream *knotclient.EventConsumer
}
func Make(config *config.Config) (*State, error) {
···
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
}
+
knotstream, err := KnotstreamConsumer(config, d, enforcer)
+
if err != nil {
+
return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
+
}
+
knotstream.Start(context.Background())
+
state := &State{
d,
oauth,
···
jc,
config,
repoResolver,
+
knotstream,
}
return state, nil
···
return
}
+
// add this knot to knotstream
+
go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain})
+
w.Write([]byte("check success"))
}
+1 -1
flake.nix
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
-
goModHash = "sha256-ZckpIPqFk7/XBiEJUbmrAzdjAxV62hv896xqAXF2aZs=";
appviewDeps = {
inherit htmx-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource;
};
···
inherit (gitignore.lib) gitignoreSource;
in {
overlays.default = final: prev: let
+
goModHash = "sha256-QPyeKKr7YMblwicQNemu3OamXwg7fVie6/IY10vQCl4=";
appviewDeps = {
inherit htmx-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource;
};