forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package queue
2
3import (
4 "sync"
5)
6
7type Job struct {
8 Run func() error
9 OnFail func(error)
10}
11
12type Queue struct {
13 jobs chan Job
14 workers int
15 wg sync.WaitGroup
16}
17
18func NewQueue(queueSize, numWorkers int) *Queue {
19 return &Queue{
20 jobs: make(chan Job, queueSize),
21 workers: numWorkers,
22 }
23}
24
25func (q *Queue) Enqueue(job Job) bool {
26 select {
27 case q.jobs <- job:
28 return true
29 default:
30 return false
31 }
32}
33
34func (q *Queue) Start() {
35 for range q.workers {
36 q.wg.Add(1)
37 go q.worker()
38 }
39}
40
41func (q *Queue) worker() {
42 defer q.wg.Done()
43 for job := range q.jobs {
44 if err := job.Run(); err != nil {
45 if job.OnFail != nil {
46 job.OnFail(err)
47 }
48 }
49 }
50}
51
52func (q *Queue) Stop() {
53 close(q.jobs)
54 q.wg.Wait()
55}