1package state
2
3import (
4 "context"
5 "encoding/json"
6 "log/slog"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "tangled.sh/tangled.sh/core/api/tangled"
12 "tangled.sh/tangled.sh/core/appview/cache"
13 "tangled.sh/tangled.sh/core/appview/config"
14 "tangled.sh/tangled.sh/core/appview/db"
15 ec "tangled.sh/tangled.sh/core/eventconsumer"
16 "tangled.sh/tangled.sh/core/eventconsumer/cursor"
17 "tangled.sh/tangled.sh/core/log"
18 "tangled.sh/tangled.sh/core/rbac"
19 spindle "tangled.sh/tangled.sh/core/spindle/models"
20)
21
22func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
23 spindles, err := db.GetSpindles(
24 d,
25 db.FilterIsNot("verified", "null"),
26 )
27 if err != nil {
28 return nil, err
29 }
30
31 srcs := make(map[ec.Source]struct{})
32 for _, s := range spindles {
33 src := ec.NewSpindleSource(s.Instance)
34 srcs[src] = struct{}{}
35 }
36
37 logger := log.New("spindlestream")
38 cache := cache.New(c.Redis.Addr)
39 cursorStore := cursor.NewRedisCursorStore(cache)
40
41 cfg := ec.ConsumerConfig{
42 Sources: srcs,
43 ProcessFunc: spindleIngester(ctx, logger, d),
44 RetryInterval: c.Spindlestream.RetryInterval,
45 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
46 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
47 WorkerCount: c.Spindlestream.WorkerCount,
48 QueueSize: c.Spindlestream.QueueSize,
49 Logger: logger,
50 Dev: c.Core.Dev,
51 CursorStore: &cursorStore,
52 }
53
54 return ec.NewConsumer(cfg), nil
55}
56
57func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
58 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
59 switch msg.Nsid {
60 case tangled.PipelineStatusNSID:
61 return ingestPipelineStatus(ctx, logger, d, source, msg)
62 }
63
64 return nil
65 }
66}
67
68func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
69 var record tangled.PipelineStatus
70 err := json.Unmarshal(msg.EventJson, &record)
71 if err != nil {
72 return err
73 }
74
75 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
76 if err != nil {
77 return err
78 }
79
80 exitCode := 0
81 if record.ExitCode != nil {
82 exitCode = int(*record.ExitCode)
83 }
84
85 // pick the record creation time if possible, or use time.Now
86 created := time.Now()
87 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
88 created = t
89 }
90
91 status := db.PipelineStatus{
92 Spindle: source.Key(),
93 Rkey: msg.Rkey,
94 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
95 PipelineRkey: pipelineUri.RecordKey().String(),
96 Created: created,
97 Workflow: record.Workflow,
98 Status: spindle.StatusKind(record.Status),
99 Error: record.Error,
100 ExitCode: exitCode,
101 }
102
103 return db.AddPipelineStatus(d, status)
104}