forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "fmt" 5 "time" 6 7 "tangled.sh/tangled.sh/core/api/tangled" 8 "tangled.sh/tangled.sh/core/knotserver/notifier" 9) 10 11type PipelineStatus string 12 13var ( 14 PipelinePending PipelineStatus = "pending" 15 PipelineRunning PipelineStatus = "running" 16 PipelineFailed PipelineStatus = "failed" 17 PipelineTimeout PipelineStatus = "timeout" 18 PipelineCancelled PipelineStatus = "cancelled" 19 PipelineSuccess PipelineStatus = "success" 20) 21 22type Pipeline struct { 23 Rkey string `json:"rkey"` 24 Knot string `json:"knot"` 25 Status PipelineStatus `json:"status"` 26 27 // only if Failed 28 Error string `json:"error"` 29 ExitCode int `json:"exit_code"` 30 31 StartedAt time.Time `json:"started_at"` 32 UpdatedAt time.Time `json:"updated_at"` 33 FinishedAt time.Time `json:"finished_at"` 34} 35 36func (p Pipeline) AsRecord() *tangled.PipelineStatus { 37 exitCode64 := int64(p.ExitCode) 38 finishedAt := p.FinishedAt.String() 39 40 return &tangled.PipelineStatus{ 41 Pipeline: fmt.Sprintf("at://%s/%s", p.Knot, p.Rkey), 42 Status: string(p.Status), 43 44 ExitCode: &exitCode64, 45 Error: &p.Error, 46 47 StartedAt: p.StartedAt.String(), 48 UpdatedAt: p.UpdatedAt.String(), 49 FinishedAt: &finishedAt, 50 } 51} 52 53func pipelineAtUri(rkey, knot string) string { 54 return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 55} 56 57func (db *DB) CreatePipeline(rkey, knot string, n *notifier.Notifier) error { 58 _, err := db.Exec(` 59 insert into pipelines (at_uri, status) 60 values (?, ?) 61 `, pipelineAtUri(rkey, knot), PipelinePending) 62 63 if err != nil { 64 return err 65 } 66 n.NotifyAll() 67 return nil 68} 69 70func (db *DB) MarkPipelineRunning(rkey, knot string, n *notifier.Notifier) error { 71 _, err := db.Exec(` 72 update pipelines 73 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 74 where at_uri = ? 75 `, PipelineRunning, pipelineAtUri(rkey, knot)) 76 77 if err != nil { 78 return err 79 } 80 n.NotifyAll() 81 return nil 82} 83 84func (db *DB) MarkPipelineFailed(rkey, knot string, exitCode int, errorMsg string, n *notifier.Notifier) error { 85 _, err := db.Exec(` 86 update pipelines 87 set status = ?, 88 exit_code = ?, 89 error = ?, 90 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 91 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 92 where at_uri = ? 93 `, PipelineFailed, exitCode, errorMsg, pipelineAtUri(rkey, knot)) 94 if err != nil { 95 return err 96 } 97 n.NotifyAll() 98 return nil 99} 100 101func (db *DB) MarkPipelineTimeout(rkey, knot string, n *notifier.Notifier) error { 102 _, err := db.Exec(` 103 update pipelines 104 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 105 where at_uri = ? 106 `, PipelineTimeout, pipelineAtUri(rkey, knot)) 107 if err != nil { 108 return err 109 } 110 n.NotifyAll() 111 return nil 112} 113 114func (db *DB) MarkPipelineSuccess(rkey, knot string, n *notifier.Notifier) error { 115 _, err := db.Exec(` 116 update pipelines 117 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 118 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 119 where at_uri = ? 120 `, PipelineSuccess, pipelineAtUri(rkey, knot)) 121 122 if err != nil { 123 return err 124 } 125 n.NotifyAll() 126 return nil 127} 128 129func (db *DB) GetPipeline(rkey, knot string) (Pipeline, error) { 130 var p Pipeline 131 err := db.QueryRow(` 132 select rkey, status, error, exit_code, started_at, updated_at, finished_at 133 from pipelines 134 where at_uri = ? 135 `, pipelineAtUri(rkey, knot)).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 136 return p, err 137} 138 139func (db *DB) GetPipelines(cursor string) ([]Pipeline, error) { 140 whereClause := "" 141 args := []any{} 142 if cursor != "" { 143 whereClause = "where rkey > ?" 144 args = append(args, cursor) 145 } 146 147 query := fmt.Sprintf(` 148 select rkey, status, error, exit_code, started_at, updated_at, finished_at 149 from pipelines 150 %s 151 order by rkey asc 152 limit 100 153 `, whereClause) 154 155 rows, err := db.Query(query, args...) 156 if err != nil { 157 return nil, err 158 } 159 defer rows.Close() 160 161 var pipelines []Pipeline 162 for rows.Next() { 163 var p Pipeline 164 rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 165 pipelines = append(pipelines, p) 166 } 167 168 if err := rows.Err(); err != nil { 169 return nil, err 170 } 171 172 return pipelines, nil 173}