forked from tangled.org/core
this repo has no description
at master 3.9 kB view raw
1package db 2 3import ( 4 "encoding/json" 5 "fmt" 6 "time" 7 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/notifier" 10 "tangled.org/core/spindle/models" 11 "tangled.org/core/tid" 12) 13 14type Event struct { 15 Rkey string `json:"rkey"` 16 Nsid string `json:"nsid"` 17 Created int64 `json:"created"` 18 EventJson string `json:"event"` 19} 20 21func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, 25 event.Nsid, 26 event.EventJson, 27 time.Now().UnixNano(), 28 ) 29 30 notifier.NotifyAll() 31 32 return err 33} 34 35func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 whereClause := "" 37 args := []any{} 38 if cursor > 0 { 39 whereClause = "where created > ?" 40 args = append(args, cursor) 41 } 42 43 query := fmt.Sprintf(` 44 select rkey, nsid, event, created 45 from events 46 %s 47 order by created asc 48 limit 100 49 `, whereClause) 50 51 rows, err := d.Query(query, args...) 52 if err != nil { 53 return nil, err 54 } 55 defer rows.Close() 56 57 var evts []Event 58 for rows.Next() { 59 var ev Event 60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 return nil, err 62 } 63 evts = append(evts, ev) 64 } 65 66 if err := rows.Err(); err != nil { 67 return nil, err 68 } 69 70 return evts, nil 71} 72 73func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error { 74 eventJson, err := json.Marshal(s) 75 if err != nil { 76 return err 77 } 78 79 event := Event{ 80 Rkey: rkey, 81 Nsid: tangled.PipelineStatusNSID, 82 Created: time.Now().UnixNano(), 83 EventJson: string(eventJson), 84 } 85 86 return d.InsertEvent(event, n) 87} 88 89func (d *DB) createStatusEvent( 90 workflowId models.WorkflowId, 91 statusKind models.StatusKind, 92 workflowError *string, 93 exitCode *int64, 94 n *notifier.Notifier, 95) error { 96 now := time.Now() 97 pipelineAtUri := workflowId.PipelineId.AtUri() 98 s := tangled.PipelineStatus{ 99 CreatedAt: now.Format(time.RFC3339), 100 Error: workflowError, 101 ExitCode: exitCode, 102 Pipeline: string(pipelineAtUri), 103 Workflow: workflowId.Name, 104 Status: string(statusKind), 105 } 106 107 eventJson, err := json.Marshal(s) 108 if err != nil { 109 return err 110 } 111 112 event := Event{ 113 Rkey: tid.TID(), 114 Nsid: tangled.PipelineStatusNSID, 115 Created: now.UnixNano(), 116 EventJson: string(eventJson), 117 } 118 119 return d.InsertEvent(event, n) 120 121} 122 123func (d *DB) GetStatus(workflowId models.WorkflowId) (*tangled.PipelineStatus, error) { 124 pipelineAtUri := workflowId.PipelineId.AtUri() 125 126 var eventJson string 127 err := d.QueryRow( 128 ` 129 select 130 event from events 131 where 132 nsid = ? 133 and json_extract(event, '$.pipeline') = ? 134 and json_extract(event, '$.workflow') = ? 135 order by 136 created desc 137 limit 138 1 139 `, 140 tangled.PipelineStatusNSID, 141 string(pipelineAtUri), 142 workflowId.Name, 143 ).Scan(&eventJson) 144 145 if err != nil { 146 return nil, err 147 } 148 149 var status tangled.PipelineStatus 150 if err := json.Unmarshal([]byte(eventJson), &status); err != nil { 151 return nil, err 152 } 153 154 return &status, nil 155} 156 157func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 158 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n) 159} 160 161func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 162 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n) 163} 164 165func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 166 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 167} 168 169func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 170 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 171} 172 173func (d *DB) StatusTimeout(workflowId models.WorkflowId, n *notifier.Notifier) error { 174 return d.createStatusEvent(workflowId, models.StatusKindTimeout, nil, nil, n) 175}