forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "strings"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/syntax"
12 "tangled.org/core/api/tangled"
13 "tangled.org/core/appview/cache"
14 "tangled.org/core/appview/config"
15 "tangled.org/core/appview/db"
16 "tangled.org/core/appview/models"
17 ec "tangled.org/core/eventconsumer"
18 "tangled.org/core/eventconsumer/cursor"
19 "tangled.org/core/log"
20 "tangled.org/core/rbac"
21 spindle "tangled.org/core/spindle/models"
22)
23
24func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) {
25 spindles, err := db.GetSpindles(
26 d,
27 db.FilterIsNot("verified", "null"),
28 )
29 if err != nil {
30 return nil, err
31 }
32
33 srcs := make(map[ec.Source]struct{})
34 for _, s := range spindles {
35 src := ec.NewSpindleSource(s.Instance)
36 srcs[src] = struct{}{}
37 }
38
39 logger := log.New("spindlestream")
40 cache := cache.New(c.Redis.Addr)
41 cursorStore := cursor.NewRedisCursorStore(cache)
42
43 cfg := ec.ConsumerConfig{
44 Sources: srcs,
45 ProcessFunc: spindleIngester(ctx, logger, d),
46 RetryInterval: c.Spindlestream.RetryInterval,
47 MaxRetryInterval: c.Spindlestream.MaxRetryInterval,
48 ConnectionTimeout: c.Spindlestream.ConnectionTimeout,
49 WorkerCount: c.Spindlestream.WorkerCount,
50 QueueSize: c.Spindlestream.QueueSize,
51 Logger: logger,
52 Dev: c.Core.Dev,
53 CursorStore: &cursorStore,
54 }
55
56 return ec.NewConsumer(cfg), nil
57}
58
59func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc {
60 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
61 switch msg.Nsid {
62 case tangled.PipelineStatusNSID:
63 return ingestPipelineStatus(ctx, logger, d, source, msg)
64 }
65
66 return nil
67 }
68}
69
70func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error {
71 var record tangled.PipelineStatus
72 err := json.Unmarshal(msg.EventJson, &record)
73 if err != nil {
74 return err
75 }
76
77 pipelineUri, err := syntax.ParseATURI(record.Pipeline)
78 if err != nil {
79 return err
80 }
81
82 exitCode := 0
83 if record.ExitCode != nil {
84 exitCode = int(*record.ExitCode)
85 }
86
87 // pick the record creation time if possible, or use time.Now
88 created := time.Now()
89 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) {
90 created = t
91 }
92
93 status := models.PipelineStatus{
94 Spindle: source.Key(),
95 Rkey: msg.Rkey,
96 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"),
97 PipelineRkey: pipelineUri.RecordKey().String(),
98 Created: created,
99 Workflow: record.Workflow,
100 Status: spindle.StatusKind(record.Status),
101 Error: record.Error,
102 ExitCode: exitCode,
103 }
104
105 err = db.AddPipelineStatus(d, status)
106 if err != nil {
107 return fmt.Errorf("failed to add pipeline status: %w", err)
108 }
109
110 return nil
111}