From 68b4e69c6dbafbe7291d72c9f505ad3e775078e4 Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Tue, 10 Jun 2025 12:15:03 +0100 Subject: [PATCH] appview: introduce knotstream consumer Change-Id: qoplqnlvlqqoyxpotuqnqtwtslttlkln similar to jetstream consumer, we now ingest events from every known knot. Signed-off-by: oppiliappan --- appview/config/config.go | 26 +++++++---- appview/state/knotstream.go | 89 +++++++++++++++++++++++++++++++++++++ appview/state/state.go | 11 +++++ flake.nix | 2 +- 4 files changed, 119 insertions(+), 9 deletions(-) create mode 100644 appview/state/knotstream.go diff --git a/appview/config/config.go b/appview/config/config.go index 8c6f12d..3f1de1c 100644 --- a/appview/config/config.go +++ b/appview/config/config.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/url" + "time" "github.com/sethvargo/go-envconfig" ) @@ -24,6 +25,14 @@ type JetstreamConfig struct { Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"` } +type KnotstreamConfig struct { + RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"` + MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"` + ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"` + WorkerCount int `env:"WORKER_COUNT, default=64"` + QueueSize int `env:"QUEUE_SIZE, default=100"` +} + type ResendConfig struct { ApiKey string `env:"API_KEY"` SentFrom string `env:"SENT_FROM, default=noreply@notifs.tangled.sh"` @@ -65,14 +74,15 @@ func (cfg RedisConfig) ToURL() string { } type Config struct { - Core CoreConfig `env:",prefix=TANGLED_"` - Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` - Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` - Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` - Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` - Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` - OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` - Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` + Core CoreConfig `env:",prefix=TANGLED_"` + Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"` + Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"` + Resend ResendConfig `env:",prefix=TANGLED_RESEND_"` + Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"` + Camo CamoConfig `env:",prefix=TANGLED_CAMO_"` + Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"` + OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"` + Redis RedisConfig `env:",prefix=TANGLED_REDIS_"` } func LoadConfig(ctx context.Context) (*Config, error) { diff --git a/appview/state/knotstream.go b/appview/state/knotstream.go new file mode 100644 index 0000000..befa2d3 --- /dev/null +++ b/appview/state/knotstream.go @@ -0,0 +1,89 @@ +package state + +import ( + "encoding/json" + "fmt" + "slices" + "time" + + "tangled.sh/tangled.sh/core/api/tangled" + "tangled.sh/tangled.sh/core/appview/cache" + "tangled.sh/tangled.sh/core/appview/config" + "tangled.sh/tangled.sh/core/appview/db" + kc "tangled.sh/tangled.sh/core/knotclient" + "tangled.sh/tangled.sh/core/log" + "tangled.sh/tangled.sh/core/rbac" +) + +func KnotstreamConsumer(c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*kc.EventConsumer, error) { + knots, err := db.GetCompletedRegistrations(d) + if err != nil { + return nil, err + } + + srcs := make(map[kc.EventSource]struct{}) + for _, k := range knots { + s := kc.EventSource{k} + srcs[s] = struct{}{} + } + + logger := log.New("knotstream") + cache := cache.New(c.Redis.Addr) + cursorStore := kc.NewRedisCursorStore(cache) + + cfg := kc.ConsumerConfig{ + Sources: srcs, + ProcessFunc: knotstreamIngester(d, enforcer), + RetryInterval: c.Knotstream.RetryInterval, + MaxRetryInterval: c.Knotstream.MaxRetryInterval, + ConnectionTimeout: c.Knotstream.ConnectionTimeout, + WorkerCount: c.Knotstream.WorkerCount, + QueueSize: c.Knotstream.QueueSize, + Logger: logger, + Dev: c.Core.Dev, + CursorStore: &cursorStore, + } + + return kc.NewEventConsumer(cfg), nil +} + +func knotstreamIngester(d *db.DB, enforcer *rbac.Enforcer) kc.ProcessFunc { + return func(source kc.EventSource, msg kc.Message) error { + switch msg.Nsid { + case tangled.GitRefUpdateNSID: + return ingestRefUpdate(d, enforcer, source, msg) + case tangled.PipelineNSID: + // TODO + } + + return nil + } +} + +func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, source kc.EventSource, msg kc.Message) error { + var record tangled.GitRefUpdate + err := json.Unmarshal(msg.EventJson, &record) + if err != nil { + return err + } + + knownKnots, err := enforcer.GetDomainsForUser(record.CommitterDid) + if err != nil { + return err + } + + if !slices.Contains(knownKnots, source.Knot) { + return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot) + } + + punch := db.Punch{ + Did: record.CommitterDid, + Date: time.Now(), + Count: 1, + } + if err := db.AddPunch(d, punch); err != nil { + return err + } + + return nil +} diff --git a/appview/state/state.go b/appview/state/state.go index 43e4c15..648952c 100644 --- a/appview/state/state.go +++ b/appview/state/state.go @@ -45,6 +45,7 @@ type State struct { jc *jetstream.JetstreamClient config *config.Config repoResolver *reporesolver.RepoResolver + knotstream *knotclient.EventConsumer } func Make(config *config.Config) (*State, error) { @@ -108,6 +109,12 @@ func Make(config *config.Config) (*State, error) { return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) } + knotstream, err := KnotstreamConsumer(config, d, enforcer) + if err != nil { + return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) + } + knotstream.Start(context.Background()) + state := &State{ d, oauth, @@ -120,6 +127,7 @@ func Make(config *config.Config) (*State, error) { jc, config, repoResolver, + knotstream, } return state, nil @@ -357,6 +365,9 @@ func (s *State) InitKnotServer(w http.ResponseWriter, r *http.Request) { return } + // add this knot to knotstream + go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain}) + w.Write([]byte("check success")) } diff --git a/flake.nix b/flake.nix index 07f663e..5676472 100644 --- a/flake.nix +++ b/flake.nix @@ -54,7 +54,7 @@ inherit (gitignore.lib) gitignoreSource; in { overlays.default = final: prev: let - goModHash = "sha256-ZckpIPqFk7/XBiEJUbmrAzdjAxV62hv896xqAXF2aZs="; + goModHash = "sha256-QPyeKKr7YMblwicQNemu3OamXwg7fVie6/IY10vQCl4="; appviewDeps = { inherit htmx-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource; }; -- 2.43.0