1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.sh/tangled.sh/core/api/tangled"
13 "tangled.sh/tangled.sh/core/appview/cache"
14 "tangled.sh/tangled.sh/core/appview/config"
15 "tangled.sh/tangled.sh/core/appview/db"
16 ec "tangled.sh/tangled.sh/core/eventconsumer"
17 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
18 "tangled.sh/tangled.sh/core/log"
19 "tangled.sh/tangled.sh/core/rbac"
20 spindle "tangled.sh/tangled.sh/core/spindle/models"
21)
22
23func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
24 spindles, err := db.GetSpindles(
25 d,
26 db.FilterIsNot("verified", "null"),
27 )
28 if err != nil {
29 return nil, err
30 }
31
32 srcs := make(map[ec.Source]struct{})
33 for _, s := range spindles {
34 src := ec.NewSpindleSource(s.Instance)
35 srcs[src] = struct{}{}
36 }
37
38 logger := log.New("spindlestream")
39 cache := cache.New(c.Redis.Addr)
40 cursorStore := cursor.NewRedisCursorStore(cache)
41
42 cfg := ec.ConsumerConfig{
43 Sources: srcs,
44 ProcessFunc: spindleIngester(ctx, logger, d),
45 RetryInterval: c.Spindlestream.RetryInterval,
46 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
47 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
48 WorkerCount: c.Spindlestream.WorkerCount,
49 QueueSize: c.Spindlestream.QueueSize,
50 Logger: logger,
51 Dev: c.Core.Dev,
52 CursorStore: &cursorStore,
53 }
54
55 return ec.NewConsumer(cfg), nil
56}
57
58func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
59 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
60 switch msg.Nsid {
61 case tangled.PipelineStatusNSID:
62 return ingestPipelineStatus(ctx, logger, d, source, msg)
63 }
64
65 return nil
66 }
67}
68
69func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
70 var record tangled.PipelineStatus
71 err := json.Unmarshal(msg.EventJson, &record)
72 if err != nil {
73 return err
74 }
75
76 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
77 if err != nil {
78 return err
79 }
80
81 exitCode := 0
82 if record.ExitCode != nil {
83 exitCode = int(*record.ExitCode)
84 }
85
86 // pick the record creation time if possible, or use time.Now
87 created := time.Now()
88 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
89 created = t
90 }
91
92 status := db.PipelineStatus{
93 Spindle: source.Key(),
94 Rkey: msg.Rkey,
95 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
96 PipelineRkey: pipelineUri.RecordKey().String(),
97 Created: created,
98 Workflow: record.Workflow,
99 Status: spindle.StatusKind(record.Status),
100 Error: record.Error,
101 ExitCode: exitCode,
102 }
103
104 err = db.AddPipelineStatus(d, status)
105 if err != nil {
106 return fmt.Errorf("failed to add pipeline status: %w", err)
107 }
108
109 return nil
110}