forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/cache" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/models" 17 ec "tangled.org/core/eventconsumer" 18 "tangled.org/core/eventconsumer/cursor" 19 "tangled.org/core/log" 20 "tangled.org/core/rbac" 21 spindle "tangled.org/core/spindle/models" 22) 23 24func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 25 spindles, err := db.GetSpindles( 26 d, 27 db.FilterIsNot("verified", "null"), 28 ) 29 if err != nil { 30 return nil, err 31 } 32 33 srcs := make(map[ec.Source]struct{}) 34 for _, s := range spindles { 35 src := ec.NewSpindleSource(s.Instance) 36 srcs[src] = struct{}{} 37 } 38 39 logger := log.New("spindlestream") 40 cache := cache.New(c.Redis.Addr) 41 cursorStore := cursor.NewRedisCursorStore(cache) 42 43 cfg := ec.ConsumerConfig{ 44 Sources: srcs, 45 ProcessFunc: spindleIngester(ctx, logger, d), 46 RetryInterval: c.Spindlestream.RetryInterval, 47 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 48 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 49 WorkerCount: c.Spindlestream.WorkerCount, 50 QueueSize: c.Spindlestream.QueueSize, 51 Logger: logger, 52 Dev: c.Core.Dev, 53 CursorStore: &cursorStore, 54 } 55 56 return ec.NewConsumer(cfg), nil 57} 58 59func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 60 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 61 switch msg.Nsid { 62 case tangled.PipelineStatusNSID: 63 return ingestPipelineStatus(ctx, logger, d, source, msg) 64 } 65 66 return nil 67 } 68} 69 70func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 71 var record tangled.PipelineStatus 72 err := json.Unmarshal(msg.EventJson, &record) 73 if err != nil { 74 return err 75 } 76 77 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 78 if err != nil { 79 return err 80 } 81 82 exitCode := 0 83 if record.ExitCode != nil { 84 exitCode = int(*record.ExitCode) 85 } 86 87 // pick the record creation time if possible, or use time.Now 88 created := time.Now() 89 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 90 created = t 91 } 92 93 status := models.PipelineStatus{ 94 Spindle: source.Key(), 95 Rkey: msg.Rkey, 96 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 97 PipelineRkey: pipelineUri.RecordKey().String(), 98 Created: created, 99 Workflow: record.Workflow, 100 Status: spindle.StatusKind(record.Status), 101 Error: record.Error, 102 ExitCode: exitCode, 103 } 104 105 err = db.AddPipelineStatus(d, status) 106 if err != nil { 107 return fmt.Errorf("failed to add pipeline status: %w", err) 108 } 109 110 return nil 111}