forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3// 4// import ( 5// "database/sql" 6// "fmt" 7// "time" 8// 9// "tangled.sh/tangled.sh/core/api/tangled" 10// "tangled.sh/tangled.sh/core/notifier" 11// ) 12// 13// type PipelineRunStatus string 14// 15// var ( 16// PipelinePending PipelineRunStatus = "pending" 17// PipelineRunning PipelineRunStatus = "running" 18// PipelineFailed PipelineRunStatus = "failed" 19// PipelineTimeout PipelineRunStatus = "timeout" 20// PipelineCancelled PipelineRunStatus = "cancelled" 21// PipelineSuccess PipelineRunStatus = "success" 22// ) 23// 24// type PipelineStatus struct { 25// Rkey string `json:"rkey"` 26// Pipeline string `json:"pipeline"` 27// Status PipelineRunStatus `json:"status"` 28// 29// // only if Failed 30// Error string `json:"error"` 31// ExitCode int `json:"exit_code"` 32// 33// LastUpdate int64 `json:"last_update"` 34// StartedAt time.Time `json:"started_at"` 35// UpdatedAt time.Time `json:"updated_at"` 36// FinishedAt time.Time `json:"finished_at"` 37// } 38// 39// func (p PipelineStatus) AsRecord() *tangled.PipelineStatus { 40// exitCode64 := int64(p.ExitCode) 41// finishedAt := p.FinishedAt.String() 42// 43// return &tangled.PipelineStatus{ 44// LexiconTypeID: tangled.PipelineStatusNSID, 45// Pipeline: p.Pipeline, 46// Status: string(p.Status), 47// 48// ExitCode: &exitCode64, 49// Error: &p.Error, 50// 51// StartedAt: p.StartedAt.String(), 52// UpdatedAt: p.UpdatedAt.String(), 53// FinishedAt: &finishedAt, 54// } 55// } 56// 57// func pipelineAtUri(rkey, knot string) string { 58// return fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineStatusNSID, knot, rkey) 59// } 60// 61// func (db *DB) CreatePipeline(rkey, pipeline string, n *notifier.Notifier) error { 62// _, err := db.Exec(` 63// insert into pipeline_status (rkey, status, pipeline, last_update) 64// values (?, ?, ?, ?) 65// `, rkey, PipelinePending, pipeline, time.Now().UnixNano()) 66// 67// if err != nil { 68// return err 69// } 70// n.NotifyAll() 71// return nil 72// } 73// 74// func (db *DB) MarkPipelineRunning(rkey string, n *notifier.Notifier) error { 75// _, err := db.Exec(` 76// update pipeline_status 77// set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), last_update = ? 78// where rkey = ? 79// `, PipelineRunning, rkey, time.Now().UnixNano()) 80// 81// if err != nil { 82// return err 83// } 84// n.NotifyAll() 85// return nil 86// } 87// 88// func (db *DB) MarkPipelineFailed(rkey string, exitCode int, errorMsg string, n *notifier.Notifier) error { 89// _, err := db.Exec(` 90// update pipeline_status 91// set status = ?, 92// exit_code = ?, 93// error = ?, 94// updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 95// finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 96// last_update = ? 97// where rkey = ? 98// `, PipelineFailed, exitCode, errorMsg, rkey, time.Now().UnixNano()) 99// if err != nil { 100// return err 101// } 102// n.NotifyAll() 103// return nil 104// } 105// 106// func (db *DB) MarkPipelineTimeout(rkey string, n *notifier.Notifier) error { 107// _, err := db.Exec(` 108// update pipeline_status 109// set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 110// where rkey = ? 111// `, PipelineTimeout, rkey) 112// if err != nil { 113// return err 114// } 115// n.NotifyAll() 116// return nil 117// } 118// 119// func (db *DB) MarkPipelineSuccess(rkey string, n *notifier.Notifier) error { 120// _, err := db.Exec(` 121// update pipeline_status 122// set status = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'), 123// finished_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 124// where rkey = ? 125// `, PipelineSuccess, rkey) 126// 127// if err != nil { 128// return err 129// } 130// n.NotifyAll() 131// return nil 132// } 133// 134// func (db *DB) GetPipelineStatus(rkey string) (PipelineStatus, error) { 135// var p PipelineStatus 136// err := db.QueryRow(` 137// select rkey, status, error, exit_code, started_at, updated_at, finished_at 138// from pipelines 139// where rkey = ? 140// `, rkey).Scan(&p.Rkey, &p.Status, &p.Error, &p.ExitCode, &p.StartedAt, &p.UpdatedAt, &p.FinishedAt) 141// return p, err 142// } 143// 144// func (db *DB) GetPipelineStatusAsRecords(cursor int64) ([]PipelineStatus, error) { 145// whereClause := "" 146// args := []any{} 147// if cursor != 0 { 148// whereClause = "where created_at > ?" 149// args = append(args, cursor) 150// } 151// 152// query := fmt.Sprintf(` 153// select rkey, status, error, exit_code, created_at, started_at, updated_at, finished_at 154// from pipeline_status 155// %s 156// order by created_at asc 157// limit 100 158// `, whereClause) 159// 160// rows, err := db.Query(query, args...) 161// if err != nil { 162// return nil, err 163// } 164// defer rows.Close() 165// 166// var pipelines []PipelineStatus 167// for rows.Next() { 168// var p PipelineStatus 169// var pipelineError sql.NullString 170// var exitCode sql.NullInt64 171// var startedAt, updatedAt string 172// var finishedAt sql.NullTime 173// 174// err := rows.Scan(&p.Rkey, &p.Status, &pipelineError, &exitCode, &p.LastUpdate, &startedAt, &updatedAt, &finishedAt) 175// if err != nil { 176// return nil, err 177// } 178// 179// if pipelineError.Valid { 180// p.Error = pipelineError.String 181// } 182// 183// if exitCode.Valid { 184// p.ExitCode = int(exitCode.Int64) 185// } 186// 187// if v, err := time.Parse(time.RFC3339, startedAt); err == nil { 188// p.StartedAt = v 189// } 190// 191// if v, err := time.Parse(time.RFC3339, updatedAt); err == nil { 192// p.UpdatedAt = v 193// } 194// 195// if finishedAt.Valid { 196// p.FinishedAt = finishedAt.Time 197// } 198// 199// pipelines = append(pipelines, p) 200// } 201// 202// if err := rows.Err(); err != nil { 203// return nil, err 204// } 205// 206// return pipelines, nil 207// }