appview: consumer events from spindles #249

merged
opened by oppi.li targeting master from push-mwkwusmyymno
Changed files
+215 -41
appview
config
pages
templates
spindles
state
+11 -10
appview/config/config.go
···
Endpoint string `env:"ENDPOINT, default=wss://jetstream1.us-east.bsky.network/subscribe"`
}
-
type KnotstreamConfig struct {
+
type ConsumerConfig struct {
RetryInterval time.Duration `env:"RETRY_INTERVAL, default=60s"`
MaxRetryInterval time.Duration `env:"MAX_RETRY_INTERVAL, default=120m"`
ConnectionTimeout time.Duration `env:"CONNECTION_TIMEOUT, default=5s"`
···
}
type Config struct {
-
Core CoreConfig `env:",prefix=TANGLED_"`
-
Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"`
-
Knotstream KnotstreamConfig `env:",prefix=TANGLED_KNOTSTREAM_"`
-
Resend ResendConfig `env:",prefix=TANGLED_RESEND_"`
-
Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"`
-
Camo CamoConfig `env:",prefix=TANGLED_CAMO_"`
-
Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"`
-
OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"`
-
Redis RedisConfig `env:",prefix=TANGLED_REDIS_"`
+
Core CoreConfig `env:",prefix=TANGLED_"`
+
Jetstream JetstreamConfig `env:",prefix=TANGLED_JETSTREAM_"`
+
Knotstream ConsumerConfig `env:",prefix=TANGLED_KNOTSTREAM_"`
+
Spindlestream ConsumerConfig `env:",prefix=TANGLED_SPINDLESTREAM_"`
+
Resend ResendConfig `env:",prefix=TANGLED_RESEND_"`
+
Posthog PosthogConfig `env:",prefix=TANGLED_POSTHOG_"`
+
Camo CamoConfig `env:",prefix=TANGLED_CAMO_"`
+
Avatar AvatarConfig `env:",prefix=TANGLED_AVATAR_"`
+
OAuth OAuthConfig `env:",prefix=TANGLED_OAUTH_"`
+
Redis RedisConfig `env:",prefix=TANGLED_REDIS_"`
}
func LoadConfig(ctx context.Context) (*Config, error) {
+3 -3
appview/pages/templates/spindles/index.html
···
{{ end }}
{{ define "all" }}
-
<section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm w-full flex flex-col gap-2">
+
<section class="rounded w-full flex flex-col gap-2">
<h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">your spindles</h2>
<div class="flex flex-col rounded border border-gray-200 dark:border-gray-700 w-full">
{{ range $spindle := .Spindles }}
···
{{ end }}
{{ define "register" }}
-
<section class="rounded bg-white dark:bg-gray-800 drop-shadow-sm w-full lg:w-fit flex flex-col gap-2">
+
<section class="rounded w-full lg:w-fit flex flex-col gap-2">
<h2 class="text-sm font-bold py-2 uppercase dark:text-gray-300">register a spindle</h2>
-
<p class="mb-2 dark:text-gray-300">Enter the hostname of your spindle to get started</p>
+
<p class="mb-2 dark:text-gray-300">Enter the hostname of your spindle to get started.</p>
<form
hx-post="/spindles/register"
class="max-w-2xl mb-2 space-y-4"
+82 -14
appview/state/knotstream.go
···
"tangled.sh/tangled.sh/core/appview/cache"
"tangled.sh/tangled.sh/core/appview/config"
"tangled.sh/tangled.sh/core/appview/db"
-
kc "tangled.sh/tangled.sh/core/knotclient"
-
"tangled.sh/tangled.sh/core/knotclient/cursor"
+
ec "tangled.sh/tangled.sh/core/eventconsumer"
+
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/rbac"
+
"tangled.sh/tangled.sh/core/workflow"
+
"github.com/bluesky-social/indigo/atproto/syntax"
"github.com/posthog/posthog-go"
)
-
func KnotstreamConsumer(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*kc.EventConsumer, error) {
+
func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client) (*ec.Consumer, error) {
knots, err := db.GetCompletedRegistrations(d)
if err != nil {
return nil, err
}
-
srcs := make(map[kc.EventSource]struct{})
+
srcs := make(map[ec.Source]struct{})
for _, k := range knots {
-
s := kc.EventSource{k}
+
s := ec.NewKnotSource(k)
srcs[s] = struct{}{}
}
···
cache := cache.New(c.Redis.Addr)
cursorStore := cursor.NewRedisCursorStore(cache)
-
cfg := kc.ConsumerConfig{
+
cfg := ec.ConsumerConfig{
Sources: srcs,
-
ProcessFunc: knotstreamIngester(ctx, d, enforcer, posthog, c.Core.Dev),
+
ProcessFunc: knotIngester(ctx, d, enforcer, posthog, c.Core.Dev),
RetryInterval: c.Knotstream.RetryInterval,
MaxRetryInterval: c.Knotstream.MaxRetryInterval,
ConnectionTimeout: c.Knotstream.ConnectionTimeout,
···
CursorStore: &cursorStore,
}
-
return kc.NewEventConsumer(cfg), nil
+
return ec.NewConsumer(cfg), nil
}
-
func knotstreamIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) kc.ProcessFunc {
-
return func(ctx context.Context, source kc.EventSource, msg kc.Message) error {
+
func knotIngester(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, dev bool) ec.ProcessFunc {
+
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
switch msg.Nsid {
case tangled.GitRefUpdateNSID:
return ingestRefUpdate(d, enforcer, posthog, dev, source, msg)
case tangled.PipelineNSID:
-
// TODO
+
return ingestPipeline(d, source, msg)
}
return nil
}
}
-
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source kc.EventSource, msg kc.Message) error {
+
func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, dev bool, source ec.Source, msg ec.Message) error {
var record tangled.GitRefUpdate
err := json.Unmarshal(msg.EventJson, &record)
if err != nil {
···
if err != nil {
return err
}
-
if !slices.Contains(knownKnots, source.Knot) {
-
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Knot)
+
if !slices.Contains(knownKnots, source.Key()) {
+
return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
}
knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
···
return nil
}
+
+
func ingestPipeline(d *db.DB, source ec.Source, msg ec.Message) error {
+
var record tangled.Pipeline
+
err := json.Unmarshal(msg.EventJson, &record)
+
if err != nil {
+
return err
+
}
+
+
if record.TriggerMetadata == nil {
+
return fmt.Errorf("empty trigger metadata: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
+
}
+
+
if record.TriggerMetadata.Repo == nil {
+
return fmt.Errorf("empty repo: nsid %s, rkey %s", msg.Nsid, msg.Rkey)
+
}
+
+
// trigger info
+
var trigger db.Trigger
+
var sha string
+
switch record.TriggerMetadata.Kind {
+
case workflow.TriggerKindPush:
+
trigger.Kind = workflow.TriggerKindPush
+
trigger.PushRef = &record.TriggerMetadata.Push.Ref
+
trigger.PushNewSha = &record.TriggerMetadata.Push.NewSha
+
trigger.PushOldSha = &record.TriggerMetadata.Push.OldSha
+
sha = *trigger.PushNewSha
+
case workflow.TriggerKindPullRequest:
+
trigger.Kind = workflow.TriggerKindPush
+
trigger.PRSourceBranch = &record.TriggerMetadata.PullRequest.SourceBranch
+
trigger.PRTargetBranch = &record.TriggerMetadata.PullRequest.TargetBranch
+
trigger.PRSourceSha = &record.TriggerMetadata.PullRequest.SourceSha
+
trigger.PRAction = &record.TriggerMetadata.PullRequest.Action
+
sha = *trigger.PRSourceSha
+
}
+
+
tx, err := d.Begin()
+
if err != nil {
+
return err
+
}
+
+
triggerId, err := db.AddTrigger(tx, trigger)
+
if err != nil {
+
return err
+
}
+
+
pipeline := db.Pipeline{
+
Rkey: msg.Rkey,
+
Knot: source.Key(),
+
RepoOwner: syntax.DID(record.TriggerMetadata.Repo.Did),
+
RepoName: record.TriggerMetadata.Repo.Repo,
+
TriggerId: int(triggerId),
+
Sha: sha,
+
}
+
+
err = db.AddPipeline(tx, pipeline)
+
if err != nil {
+
return err
+
}
+
+
err = tx.Commit()
+
if err != nil {
+
return err
+
}
+
+
return err
+
}
+93
appview/state/spindlestream.go
···
+
package state
+
+
import (
+
"context"
+
"encoding/json"
+
"log/slog"
+
"time"
+
+
"github.com/bluesky-social/indigo/atproto/syntax"
+
"tangled.sh/tangled.sh/core/api/tangled"
+
"tangled.sh/tangled.sh/core/appview/cache"
+
"tangled.sh/tangled.sh/core/appview/config"
+
"tangled.sh/tangled.sh/core/appview/db"
+
ec "tangled.sh/tangled.sh/core/eventconsumer"
+
"tangled.sh/tangled.sh/core/eventconsumer/cursor"
+
"tangled.sh/tangled.sh/core/log"
+
"tangled.sh/tangled.sh/core/rbac"
+
)
+
+
func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
+
spindles, err := db.GetSpindles(d)
+
if err != nil {
+
return nil, err
+
}
+
+
srcs := make(map[ec.Source]struct{})
+
for _, s := range spindles {
+
src := ec.NewSpindleSource(s.Instance)
+
srcs[src] = struct{}{}
+
}
+
+
logger := log.New("spindlestream")
+
cache := cache.New(c.Redis.Addr)
+
cursorStore := cursor.NewRedisCursorStore(cache)
+
+
cfg := ec.ConsumerConfig{
+
Sources: srcs,
+
ProcessFunc: spindleIngester(ctx, logger, d),
+
RetryInterval: c.Spindlestream.RetryInterval,
+
MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
+
ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
+
WorkerCount: c.Spindlestream.WorkerCount,
+
QueueSize: c.Spindlestream.QueueSize,
+
Logger: logger,
+
Dev: c.Core.Dev,
+
CursorStore: &cursorStore,
+
}
+
+
return ec.NewConsumer(cfg), nil
+
}
+
+
func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
+
return func(ctx context.Context, source ec.Source, msg ec.Message) error {
+
switch msg.Nsid {
+
case tangled.PipelineStatusNSID:
+
return ingestPipelineStatus(ctx, logger, d, source, msg)
+
}
+
+
return nil
+
}
+
}
+
+
func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
+
var record tangled.PipelineStatus
+
err := json.Unmarshal(msg.EventJson, &record)
+
if err != nil {
+
return err
+
}
+
+
pipelineUri, err := syntax.ParseATURI(record.Pipeline)
+
if err != nil {
+
return err
+
}
+
+
exitCode := 0
+
if record.ExitCode != nil {
+
exitCode = int(*record.ExitCode)
+
}
+
+
status := db.PipelineStatus{
+
Spindle: source.Key(),
+
Rkey: msg.Rkey,
+
PipelineKnot: pipelineUri.Authority().String(),
+
PipelineRkey: pipelineUri.RecordKey().String(),
+
Created: time.Now(),
+
Workflow: record.Workflow,
+
Status: record.Status,
+
Error: record.Error,
+
ExitCode: exitCode,
+
}
+
+
return db.AddPipelineStatus(d, status)
+
}
+26 -14
appview/state/state.go
···
"tangled.sh/tangled.sh/core/appview/oauth"
"tangled.sh/tangled.sh/core/appview/pages"
"tangled.sh/tangled.sh/core/appview/reporesolver"
+
"tangled.sh/tangled.sh/core/eventconsumer"
"tangled.sh/tangled.sh/core/jetstream"
"tangled.sh/tangled.sh/core/knotclient"
"tangled.sh/tangled.sh/core/rbac"
)
type State struct {
-
db *db.DB
-
oauth *oauth.OAuth
-
enforcer *rbac.Enforcer
-
tidClock syntax.TIDClock
-
pages *pages.Pages
-
sess *session.SessionStore
-
idResolver *idresolver.Resolver
-
posthog posthog.Client
-
jc *jetstream.JetstreamClient
-
config *config.Config
-
repoResolver *reporesolver.RepoResolver
-
knotstream *knotclient.EventConsumer
+
db *db.DB
+
oauth *oauth.OAuth
+
enforcer *rbac.Enforcer
+
tidClock syntax.TIDClock
+
pages *pages.Pages
+
sess *session.SessionStore
+
idResolver *idresolver.Resolver
+
posthog posthog.Client
+
jc *jetstream.JetstreamClient
+
config *config.Config
+
repoResolver *reporesolver.RepoResolver
+
knotstream *eventconsumer.Consumer
+
spindlestream *eventconsumer.Consumer
}
func Make(ctx context.Context, config *config.Config) (*State, error) {
···
return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
}
-
knotstream, err := KnotstreamConsumer(ctx, config, d, enforcer, posthog)
+
knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
if err != nil {
return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
}
knotstream.Start(ctx)
+
spindlestream, err := Spindlestream(ctx, config, d, enforcer)
+
if err != nil {
+
return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
+
}
+
spindlestream.Start(ctx)
+
state := &State{
d,
oauth,
···
config,
repoResolver,
knotstream,
+
spindlestream,
}
return state, nil
···
}
// add this knot to knotstream
-
go s.knotstream.AddSource(context.Background(), knotclient.EventSource{domain})
+
go s.knotstream.AddSource(
+
context.Background(),
+
eventconsumer.NewKnotSource(domain),
+
)
w.Write([]byte("check success"))
}