forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "strings" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 "tangled.org/core/api/tangled" 13 "tangled.org/core/appview/cache" 14 "tangled.org/core/appview/config" 15 "tangled.org/core/appview/db" 16 "tangled.org/core/appview/models" 17 ec "tangled.org/core/eventconsumer" 18 "tangled.org/core/eventconsumer/cursor" 19 "tangled.org/core/log" 20 "tangled.org/core/rbac" 21 spindle "tangled.org/core/spindle/models" 22) 23 24func Spindlestream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer) (*ec.Consumer, error) { 25 logger := log.FromContext(ctx) 26 logger = log.SubLogger(logger, "spindlestream") 27 28 spindles, err := db.GetSpindles( 29 d, 30 db.FilterIsNot("verified", "null"), 31 ) 32 if err != nil { 33 return nil, err 34 } 35 36 srcs := make(map[ec.Source]struct{}) 37 for _, s := range spindles { 38 src := ec.NewSpindleSource(s.Instance) 39 srcs[src] = struct{}{} 40 } 41 42 cache := cache.New(c.Redis.Addr) 43 cursorStore := cursor.NewRedisCursorStore(cache) 44 45 cfg := ec.ConsumerConfig{ 46 Sources: srcs, 47 ProcessFunc: spindleIngester(ctx, logger, d), 48 RetryInterval: c.Spindlestream.RetryInterval, 49 MaxRetryInterval: c.Spindlestream.MaxRetryInterval, 50 ConnectionTimeout: c.Spindlestream.ConnectionTimeout, 51 WorkerCount: c.Spindlestream.WorkerCount, 52 QueueSize: c.Spindlestream.QueueSize, 53 Logger: logger, 54 Dev: c.Core.Dev, 55 CursorStore: &cursorStore, 56 } 57 58 return ec.NewConsumer(cfg), nil 59} 60 61func spindleIngester(ctx context.Context, logger *slog.Logger, d *db.DB) ec.ProcessFunc { 62 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 63 switch msg.Nsid { 64 case tangled.PipelineStatusNSID: 65 return ingestPipelineStatus(ctx, logger, d, source, msg) 66 } 67 68 return nil 69 } 70} 71 72func ingestPipelineStatus(ctx context.Context, logger *slog.Logger, d *db.DB, source ec.Source, msg ec.Message) error { 73 var record tangled.PipelineStatus 74 err := json.Unmarshal(msg.EventJson, &record) 75 if err != nil { 76 return err 77 } 78 79 pipelineUri, err := syntax.ParseATURI(record.Pipeline) 80 if err != nil { 81 return err 82 } 83 84 exitCode := 0 85 if record.ExitCode != nil { 86 exitCode = int(*record.ExitCode) 87 } 88 89 // pick the record creation time if possible, or use time.Now 90 created := time.Now() 91 if t, err := time.Parse(time.RFC3339, record.CreatedAt); err == nil && created.After(t) { 92 created = t 93 } 94 95 status := models.PipelineStatus{ 96 Spindle: source.Key(), 97 Rkey: msg.Rkey, 98 PipelineKnot: strings.TrimPrefix(pipelineUri.Authority().String(), "did:web:"), 99 PipelineRkey: pipelineUri.RecordKey().String(), 100 Created: created, 101 Workflow: record.Workflow, 102 Status: spindle.StatusKind(record.Status), 103 Error: record.Error, 104 ExitCode: exitCode, 105 } 106 107 err = db.AddPipelineStatus(d, status) 108 if err != nil { 109 return fmt.Errorf("failed to add pipeline status: %w", err) 110 } 111 112 return nil 113}