forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package db 2 3import ( 4 "encoding/json" 5 "fmt" 6 "time" 7 8 "tangled.sh/tangled.sh/core/api/tangled" 9 "tangled.sh/tangled.sh/core/notifier" 10 "tangled.sh/tangled.sh/core/spindle/models" 11 "tangled.sh/tangled.sh/core/tid" 12) 13 14type Event struct { 15 Rkey string `json:"rkey"` 16 Nsid string `json:"nsid"` 17 Created int64 `json:"created"` 18 EventJson string `json:"event"` 19} 20 21func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, 25 event.Nsid, 26 event.EventJson, 27 time.Now().UnixNano(), 28 ) 29 30 notifier.NotifyAll() 31 32 return err 33} 34 35func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 whereClause := "" 37 args := []any{} 38 if cursor > 0 { 39 whereClause = "where created > ?" 40 args = append(args, cursor) 41 } 42 43 query := fmt.Sprintf(` 44 select rkey, nsid, event, created 45 from events 46 %s 47 order by created asc 48 limit 100 49 `, whereClause) 50 51 rows, err := d.Query(query, args...) 52 if err != nil { 53 return nil, err 54 } 55 defer rows.Close() 56 57 var evts []Event 58 for rows.Next() { 59 var ev Event 60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 return nil, err 62 } 63 evts = append(evts, ev) 64 } 65 66 if err := rows.Err(); err != nil { 67 return nil, err 68 } 69 70 return evts, nil 71} 72 73func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error { 74 eventJson, err := json.Marshal(s) 75 if err != nil { 76 return err 77 } 78 79 event := Event{ 80 Rkey: rkey, 81 Nsid: tangled.PipelineStatusNSID, 82 Created: time.Now().UnixNano(), 83 EventJson: string(eventJson), 84 } 85 86 return d.InsertEvent(event, n) 87} 88 89type StatusKind string 90 91var ( 92 StatusKindPending StatusKind = "pending" 93 StatusKindRunning StatusKind = "running" 94 StatusKindFailed StatusKind = "failed" 95 StatusKindTimeout StatusKind = "timeout" 96 StatusKindCancelled StatusKind = "cancelled" 97 StatusKindSuccess StatusKind = "success" 98) 99 100func (d *DB) createStatusEvent( 101 workflowId models.WorkflowId, 102 statusKind StatusKind, 103 workflowError *string, 104 exitCode *int64, 105 n *notifier.Notifier, 106) error { 107 now := time.Now() 108 pipelineAtUri := workflowId.PipelineId.AtUri() 109 s := tangled.PipelineStatus{ 110 CreatedAt: now.Format(time.RFC3339), 111 Error: workflowError, 112 ExitCode: exitCode, 113 Pipeline: string(pipelineAtUri), 114 Workflow: workflowId.Name, 115 Status: string(statusKind), 116 } 117 118 eventJson, err := json.Marshal(s) 119 if err != nil { 120 return err 121 } 122 123 event := Event{ 124 Rkey: tid.TID(), 125 Nsid: tangled.PipelineStatusNSID, 126 Created: now.UnixNano(), 127 EventJson: string(eventJson), 128 } 129 130 return d.InsertEvent(event, n) 131 132} 133 134func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 135 return d.createStatusEvent(workflowId, StatusKindPending, nil, nil, n) 136} 137 138func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 139 return d.createStatusEvent(workflowId, StatusKindRunning, nil, nil, n) 140} 141 142func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 143 return d.createStatusEvent(workflowId, StatusKindFailed, &workflowError, &exitCode, n) 144} 145 146func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 147 return d.createStatusEvent(workflowId, StatusKindSuccess, nil, nil, n) 148}