forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
at master 3.0 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/cache" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/models" 17 ec "tangled.org/core/eventconsumer" 18 "tangled.org/core/eventconsumer/cursor" 19 "tangled.org/core/log" 20 "tangled.org/core/orm" 21 "tangled.org/core/rbac" 22 spindle "tangled.org/core/spindle/models" 23) 24 25func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 26 logger := log.FromContext(ctx) 27 logger = log.SubLogger(logger, "spindlestream") 28 29 spindles, err := db.GetSpindles( 30 d, 31 orm.FilterIsNot("verified", "null"), 32 ) 33 if err != nil { 34 return nil, err 35 } 36 37 srcs := make(map[ec.Source]struct{}) 38 for _, s := range spindles { 39 src := ec.NewSpindleSource(s.Instance) 40 srcs[src] = struct{}{} 41 } 42 43 cache := cache.New(c.Redis.Addr) 44 cursorStore := cursor.NewRedisCursorStore(cache) 45 46 cfg := ec.ConsumerConfig{ 47 Sources: srcs, 48 ProcessFunc: spindleIngester(ctx, logger, d), 49 RetryInterval: c.Spindlestream.RetryInterval, 50 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 51 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 52 WorkerCount: c.Spindlestream.WorkerCount, 53 QueueSize: c.Spindlestream.QueueSize, 54 Logger: logger, 55 Dev: c.Core.Dev, 56 CursorStore: &cursorStore, 57 } 58 59 return ec.NewConsumer(cfg), nil 60} 61 62func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 63 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 64 switch msg.Nsid { 65 case tangled.PipelineStatusNSID: 66 return ingestPipelineStatus(ctx, logger, d, source, msg) 67 } 68 69 return nil 70 } 71} 72 73func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 74 var record tangled.PipelineStatus 75 err := json.Unmarshal(msg.EventJson, &record) 76 if err != nil { 77 return err 78 } 79 80 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 81 if err != nil { 82 return err 83 } 84 85 exitCode := 0 86 if record.ExitCode != nil { 87 exitCode = int(*record.ExitCode) 88 } 89 90 // pick the record creation time if possible, or use time.Now 91 created := time.Now() 92 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 93 created = t 94 } 95 96 status := models.PipelineStatus{ 97 Spindle: source.Key(), 98 Rkey: msg.Rkey, 99 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 100 PipelineRkey: pipelineUri.RecordKey().String(), 101 Created: created, 102 Workflow: record.Workflow, 103 Status: spindle.StatusKind(record.Status), 104 Error: record.Error, 105 ExitCode: exitCode, 106 } 107 108 err = db.AddPipelineStatus(d, status) 109 if err != nil { 110 return fmt.Errorf("failed to add pipeline status: %w", err) 111 } 112 113 return nil 114}