forked from tangled.org/core
this repo has no description
1package db 2 3import ( 4 "encoding/json" 5 "fmt" 6 "time" 7 8 "tangled.sh/tangled.sh/core/api/tangled" 9 "tangled.sh/tangled.sh/core/notifier" 10 "tangled.sh/tangled.sh/core/spindle/models" 11 "tangled.sh/tangled.sh/core/tid" 12) 13 14type Event struct { 15 Rkey string `json:"rkey"` 16 Nsid string `json:"nsid"` 17 Created int64 `json:"created"` 18 EventJson string `json:"event"` 19} 20 21func (d *DB) InsertEvent(event Event, notifier *notifier.Notifier) error { 22 _, err := d.Exec( 23 `insert into events (rkey, nsid, event, created) values (?, ?, ?, ?)`, 24 event.Rkey, 25 event.Nsid, 26 event.EventJson, 27 time.Now().UnixNano(), 28 ) 29 30 notifier.NotifyAll() 31 32 return err 33} 34 35func (d *DB) GetEvents(cursor int64) ([]Event, error) { 36 whereClause := "" 37 args := []any{} 38 if cursor > 0 { 39 whereClause = "where created > ?" 40 args = append(args, cursor) 41 } 42 43 query := fmt.Sprintf(` 44 select rkey, nsid, event, created 45 from events 46 %s 47 order by created asc 48 limit 100 49 `, whereClause) 50 51 rows, err := d.Query(query, args...) 52 if err != nil { 53 return nil, err 54 } 55 defer rows.Close() 56 57 var evts []Event 58 for rows.Next() { 59 var ev Event 60 if err := rows.Scan(&ev.Rkey, &ev.Nsid, &ev.EventJson, &ev.Created); err != nil { 61 return nil, err 62 } 63 evts = append(evts, ev) 64 } 65 66 if err := rows.Err(); err != nil { 67 return nil, err 68 } 69 70 return evts, nil 71} 72 73func (d *DB) CreateStatusEvent(rkey string, s tangled.PipelineStatus, n *notifier.Notifier) error { 74 eventJson, err := json.Marshal(s) 75 if err != nil { 76 return err 77 } 78 79 event := Event{ 80 Rkey: rkey, 81 Nsid: tangled.PipelineStatusNSID, 82 Created: time.Now().UnixNano(), 83 EventJson: string(eventJson), 84 } 85 86 return d.InsertEvent(event, n) 87} 88 89func (d *DB) createStatusEvent( 90 workflowId models.WorkflowId, 91 statusKind models.StatusKind, 92 workflowError *string, 93 exitCode *int64, 94 n *notifier.Notifier, 95) error { 96 now := time.Now() 97 pipelineAtUri := workflowId.PipelineId.AtUri() 98 s := tangled.PipelineStatus{ 99 CreatedAt: now.Format(time.RFC3339), 100 Error: workflowError, 101 ExitCode: exitCode, 102 Pipeline: string(pipelineAtUri), 103 Workflow: workflowId.Name, 104 Status: string(statusKind), 105 } 106 107 eventJson, err := json.Marshal(s) 108 if err != nil { 109 return err 110 } 111 112 event := Event{ 113 Rkey: tid.TID(), 114 Nsid: tangled.PipelineStatusNSID, 115 Created: now.UnixNano(), 116 EventJson: string(eventJson), 117 } 118 119 return d.InsertEvent(event, n) 120 121} 122 123func (d *DB) StatusPending(workflowId models.WorkflowId, n *notifier.Notifier) error { 124 return d.createStatusEvent(workflowId, models.StatusKindPending, nil, nil, n) 125} 126 127func (d *DB) StatusRunning(workflowId models.WorkflowId, n *notifier.Notifier) error { 128 return d.createStatusEvent(workflowId, models.StatusKindRunning, nil, nil, n) 129} 130 131func (d *DB) StatusFailed(workflowId models.WorkflowId, workflowError string, exitCode int64, n *notifier.Notifier) error { 132 return d.createStatusEvent(workflowId, models.StatusKindFailed, &workflowError, &exitCode, n) 133} 134 135func (d *DB) StatusSuccess(workflowId models.WorkflowId, n *notifier.Notifier) error { 136 return d.createStatusEvent(workflowId, models.StatusKindSuccess, nil, nil, n) 137}