forked from tangled.org/core
this repo has no description
at master 3.0 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.sh/tangled.sh/core/api/tangled" 13 "tangled.sh/tangled.sh/core/appview/cache" 14 "tangled.sh/tangled.sh/core/appview/config" 15 "tangled.sh/tangled.sh/core/appview/db" 16 ec "tangled.sh/tangled.sh/core/eventconsumer" 17 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 18 "tangled.sh/tangled.sh/core/log" 19 "tangled.sh/tangled.sh/core/rbac" 20 spindle "tangled.sh/tangled.sh/core/spindle/models" 21) 22 23func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 24 spindles, err := db.GetSpindles( 25 d, 26 db.FilterIsNot("verified", "null"), 27 ) 28 if err != nil { 29 return nil, err 30 } 31 32 srcs := make(map[ec.Source]struct{}) 33 for _, s := range spindles { 34 src := ec.NewSpindleSource(s.Instance) 35 srcs[src] = struct{}{} 36 } 37 38 logger := log.New("spindlestream") 39 cache := cache.New(c.Redis.Addr) 40 cursorStore := cursor.NewRedisCursorStore(cache) 41 42 cfg := ec.ConsumerConfig{ 43 Sources: srcs, 44 ProcessFunc: spindleIngester(ctx, logger, d), 45 RetryInterval: c.Spindlestream.RetryInterval, 46 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 47 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 48 WorkerCount: c.Spindlestream.WorkerCount, 49 QueueSize: c.Spindlestream.QueueSize, 50 Logger: logger, 51 Dev: c.Core.Dev, 52 CursorStore: &cursorStore, 53 } 54 55 return ec.NewConsumer(cfg), nil 56} 57 58func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 59 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 60 switch msg.Nsid { 61 case tangled.PipelineStatusNSID: 62 return ingestPipelineStatus(ctx, logger, d, source, msg) 63 } 64 65 return nil 66 } 67} 68 69func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 70 var record tangled.PipelineStatus 71 err := json.Unmarshal(msg.EventJson, &record) 72 if err != nil { 73 return err 74 } 75 76 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 77 if err != nil { 78 return err 79 } 80 81 exitCode := 0 82 if record.ExitCode != nil { 83 exitCode = int(*record.ExitCode) 84 } 85 86 // pick the record creation time if possible, or use time.Now 87 created := time.Now() 88 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 89 created = t 90 } 91 92 status := db.PipelineStatus{ 93 Spindle: source.Key(), 94 Rkey: msg.Rkey, 95 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 96 PipelineRkey: pipelineUri.RecordKey().String(), 97 Created: created, 98 Workflow: record.Workflow, 99 Status: spindle.StatusKind(record.Status), 100 Error: record.Error, 101 ExitCode: exitCode, 102 } 103 104 err = db.AddPipelineStatus(d, status) 105 if err != nil { 106 return fmt.Errorf("failed to add pipeline status: %w", err) 107 } 108 109 return nil 110}