forked from tangled.org/core
this repo has no description
at test-ci 2.9 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "log/slog" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "tangled.sh/tangled.sh/core/api/tangled" 12 "tangled.sh/tangled.sh/core/appview/cache" 13 "tangled.sh/tangled.sh/core/appview/config" 14 "tangled.sh/tangled.sh/core/appview/db" 15 ec "tangled.sh/tangled.sh/core/eventconsumer" 16 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 17 "tangled.sh/tangled.sh/core/log" 18 "tangled.sh/tangled.sh/core/rbac" 19 spindle "tangled.sh/tangled.sh/core/spindle/models" 20) 21 22func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 23 spindles, err := db.GetSpindles( 24 d, 25 db.FilterIsNot("verified", "null"), 26 ) 27 if err != nil { 28 return nil, err 29 } 30 31 srcs := make(map[ec.Source]struct{}) 32 for _, s := range spindles { 33 src := ec.NewSpindleSource(s.Instance) 34 srcs[src] = struct{}{} 35 } 36 37 logger := log.New("spindlestream") 38 cache := cache.New(c.Redis.Addr) 39 cursorStore := cursor.NewRedisCursorStore(cache) 40 41 cfg := ec.ConsumerConfig{ 42 Sources: srcs, 43 ProcessFunc: spindleIngester(ctx, logger, d), 44 RetryInterval: c.Spindlestream.RetryInterval, 45 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 46 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 47 WorkerCount: c.Spindlestream.WorkerCount, 48 QueueSize: c.Spindlestream.QueueSize, 49 Logger: logger, 50 Dev: c.Core.Dev, 51 CursorStore: &cursorStore, 52 } 53 54 return ec.NewConsumer(cfg), nil 55} 56 57func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 58 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 59 switch msg.Nsid { 60 case tangled.PipelineStatusNSID: 61 return ingestPipelineStatus(ctx, logger, d, source, msg) 62 } 63 64 return nil 65 } 66} 67 68func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 69 var record tangled.PipelineStatus 70 err := json.Unmarshal(msg.EventJson, &record) 71 if err != nil { 72 return err 73 } 74 75 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 76 if err != nil { 77 return err 78 } 79 80 exitCode := 0 81 if record.ExitCode != nil { 82 exitCode = int(*record.ExitCode) 83 } 84 85 // pick the record creation time if possible, or use time.Now 86 created := time.Now() 87 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 88 created = t 89 } 90 91 status := db.PipelineStatus{ 92 Spindle: source.Key(), 93 Rkey: msg.Rkey, 94 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 95 PipelineRkey: pipelineUri.RecordKey().String(), 96 Created: created, 97 Workflow: record.Workflow, 98 Status: spindle.StatusKind(record.Status), 99 Error: record.Error, 100 ExitCode: exitCode, 101 } 102 103 return db.AddPipelineStatus(d, status) 104}