forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "fmt" 5 "time" 6 7 "tangled.sh/tangled.sh/core/api/tangled" 8 "tangled.sh/tangled.sh/core/notifier" 9) 10 11type PipelineRunStatus string 12 13var ( 14 PipelinePending PipelineRunStatus = "pending" 15 PipelineRunning PipelineRunStatus = "running" 16 PipelineFailed PipelineRunStatus = "failed" 17 PipelineTimeout PipelineRunStatus = "timeout" 18 PipelineCancelled PipelineRunStatus = "cancelled" 19 PipelineSuccess PipelineRunStatus = "success" 20) 21 22type PipelineStatus struct { 23 Rkey string `json:"rkey"` 24 Pipeline string `json:"pipeline"` 25 Status PipelineRunStatus `json:"status"` 26 27 // only if Failed 28 Error string `json:"error"` 29 ExitCode int `json:"exit_code"` 30 31 StartedAt time.Time `json:"started_at"` 32 UpdatedAt time.Time `json:"updated_at"` 33 FinishedAt time.Time `json:"finished_at"` 34} 35 36func (p PipelineStatus) AsRecord() *tangled.PipelineStatus { 37 exitCode64 := int64(p.ExitCode) 38 finishedAt := p.FinishedAt.String() 39 40 return &tangled.PipelineStatus{ 41 LexiconTypeID: tangled.PipelineStatusNSID, 42 Pipeline: p.Pipeline, 43 Status: string(p.Status), 44 45 ExitCode: &exitCode64, 46 Error: &p.Error, 47 48 StartedAt: p.StartedAt.String(), 49 UpdatedAt: p.UpdatedAt.String(), 50 FinishedAt: &finishedAt, 51 } 52} 53 54func pipelineAtUri(rkey, knot string) string { 55 return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 56} 57 58func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error { 59 _, err := db.Exec(` 60 insert into pipeline_status (rkey, status, pipeline) 61 values (?, ?, ?) 62 `, rkey, PipelinePending, pipeline) 63 64 if err != nil { 65 return err 66 } 67 n.NotifyAll() 68 return nil 69} 70 71func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error { 72 _, err := db.Exec(` 73 update pipeline_status 74 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 75 where rkey = ? 76 `, PipelineRunning, rkey) 77 78 if err != nil { 79 return err 80 } 81 n.NotifyAll() 82 return nil 83} 84 85func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error { 86 _, err := db.Exec(` 87 update pipeline_status 88 set status = ?, 89 exit_code = ?, 90 error = ?, 91 updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 92 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 93 where rkey = ? 94 `, PipelineFailed, exitCode, errorMsg, rkey) 95 if err != nil { 96 return err 97 } 98 n.NotifyAll() 99 return nil 100} 101 102func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error { 103 _, err := db.Exec(` 104 update pipeline_status 105 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 106 where rkey = ? 107 `, PipelineTimeout, rkey) 108 if err != nil { 109 return err 110 } 111 n.NotifyAll() 112 return nil 113} 114 115func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error { 116 _, err := db.Exec(` 117 update pipeline_status 118 set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 119 finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 120 where rkey = ? 121 `, PipelineSuccess, rkey) 122 123 if err != nil { 124 return err 125 } 126 n.NotifyAll() 127 return nil 128} 129 130func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) { 131 var p PipelineStatus 132 err := db.QueryRow(` 133 select rkey, status, error, exit_code, started_at, updated_at, finished_at 134 from pipelines 135 where rkey = ? 136 `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 137 return p, err 138} 139 140func (db *DB) GetPipelineStatusAsRecords(cursor string) ([]PipelineStatus, error) { 141 whereClause := "" 142 args := []any{} 143 if cursor != "" { 144 whereClause = "where rkey > ?" 145 args = append(args, cursor) 146 } 147 148 query := fmt.Sprintf(` 149 select rkey, status, error, exit_code, started_at, updated_at, finished_at 150 from pipeline_status 151 %s 152 order by rkey asc 153 limit 100 154 `, whereClause) 155 156 rows, err := db.Query(query, args...) 157 if err != nil { 158 return nil, err 159 } 160 defer rows.Close() 161 162 var pipelines []PipelineStatus 163 for rows.Next() { 164 var p PipelineStatus 165 rows.Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 166 pipelines = append(pipelines, p) 167 } 168 169 if err := rows.Err(); err != nil { 170 return nil, err 171 } 172 173 records := []*tangled.PipelineStatus{} 174 for _, p := range pipelines { 175 records = append(records, p.AsRecord()) 176 } 177 178 return pipelines, nil 179}