···
11
+
"tangled.sh/tangled.sh/core/log"
13
+
"github.com/gorilla/websocket"
16
+
type ProcessFunc func(source string, message []byte) error
18
+
type ConsumerConfig struct {
20
+
ProcessFunc ProcessFunc
21
+
RetryInterval time.Duration
22
+
MaxRetryInterval time.Duration
23
+
ConnectionTimeout time.Duration
28
+
type EventConsumer struct {
31
+
cancel context.CancelFunc
33
+
dialer *websocket.Dialer
37
+
randSource *rand.Rand
45
+
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
46
+
if cfg.RetryInterval == 0 {
47
+
cfg.RetryInterval = 15 * time.Minute
49
+
if cfg.ConnectionTimeout == 0 {
50
+
cfg.ConnectionTimeout = 10 * time.Second
52
+
if cfg.WorkerCount <= 0 {
55
+
if cfg.MaxRetryInterval == 0 {
56
+
cfg.MaxRetryInterval = 1 * time.Hour
58
+
if cfg.Logger == nil {
59
+
cfg.Logger = log.New("eventconsumer")
62
+
ctx, cancel := context.WithCancel(context.Background())
64
+
return &EventConsumer{
68
+
dialer: websocket.DefaultDialer,
69
+
jobQueue: make(chan job, 100), // buffered job queue
71
+
randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
75
+
func (c *EventConsumer) Start() {
77
+
for range c.cfg.WorkerCount {
83
+
for _, source := range c.cfg.Sources {
85
+
go c.startConnectionLoop(source)
89
+
func (c *EventConsumer) Stop() {
91
+
c.connMap.Range(func(_, val any) bool {
92
+
if conn, ok := val.(*websocket.Conn); ok {
101
+
func (c *EventConsumer) worker() {
105
+
case <-c.ctx.Done():
107
+
case j, ok := <-c.jobQueue:
111
+
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
112
+
c.logger.Error("error processing message", "source", j.source, "err", err)
118
+
func (c *EventConsumer) startConnectionLoop(source string) {
121
+
retryInterval := c.cfg.RetryInterval
125
+
case <-c.ctx.Done():
128
+
err := c.runConnection(source)
130
+
c.logger.Error("connection failed", "source", source, "err", err)
134
+
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
135
+
delay := retryInterval + jitter
137
+
if retryInterval < c.cfg.MaxRetryInterval {
139
+
if retryInterval > c.cfg.MaxRetryInterval {
140
+
retryInterval = c.cfg.MaxRetryInterval
144
+
c.logger.Info("retrying connection", "source", source, "delay", delay)
146
+
case <-time.After(delay):
147
+
case <-c.ctx.Done():
154
+
func (c *EventConsumer) runConnection(source string) error {
155
+
ctx, cancel := context.WithTimeout(c.ctx, c.cfg.ConnectionTimeout)
158
+
u, err := url.Parse(source)
163
+
conn, _, err := c.dialer.DialContext(ctx, u.String(), nil)
169
+
c.connMap.Store(source, conn)
170
+
defer c.connMap.Delete(source)
172
+
c.logger.Info("connected", "source", source)
176
+
case <-c.ctx.Done():
179
+
msgType, msg, err := conn.ReadMessage()
183
+
if msgType != websocket.TextMessage {
188
+
case c.jobQueue <- job{source: source, message: msg}:
189
+
case <-c.ctx.Done():