···
11
+
"tangled.sh/tangled.sh/core/log"
13
+
"github.com/gorilla/websocket"
16
+
type ProcessFunc func(source string, message []byte) error
18
+
type ConsumerConfig struct {
20
+
ProcessFunc ProcessFunc
21
+
RetryInterval time.Duration
22
+
MaxRetryInterval time.Duration
23
+
ConnectionTimeout time.Duration
28
+
type EventConsumer struct {
31
+
dialer *websocket.Dialer
35
+
randSource *rand.Rand
43
+
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
44
+
if cfg.RetryInterval == 0 {
45
+
cfg.RetryInterval = 15 * time.Minute
47
+
if cfg.ConnectionTimeout == 0 {
48
+
cfg.ConnectionTimeout = 10 * time.Second
50
+
if cfg.WorkerCount <= 0 {
53
+
if cfg.MaxRetryInterval == 0 {
54
+
cfg.MaxRetryInterval = 1 * time.Hour
56
+
if cfg.Logger == nil {
57
+
cfg.Logger = log.New("eventconsumer")
59
+
return &EventConsumer{
61
+
dialer: websocket.DefaultDialer,
62
+
jobQueue: make(chan job, 100), // buffered job queue
64
+
randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
68
+
func (c *EventConsumer) Start(ctx context.Context) {
70
+
for range c.cfg.WorkerCount {
76
+
for _, source := range c.cfg.Sources {
78
+
go c.startConnectionLoop(ctx, source)
82
+
func (c *EventConsumer) Stop() {
83
+
c.connMap.Range(func(_, val any) bool {
84
+
if conn, ok := val.(*websocket.Conn); ok {
93
+
func (c *EventConsumer) worker(ctx context.Context) {
99
+
case j, ok := <-c.jobQueue:
103
+
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
104
+
c.logger.Error("error processing message", "source", j.source, "err", err)
110
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
112
+
retryInterval := c.cfg.RetryInterval
118
+
err := c.runConnection(ctx, source)
120
+
c.logger.Error("connection failed", "source", source, "err", err)
124
+
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
125
+
delay := retryInterval + jitter
127
+
if retryInterval < c.cfg.MaxRetryInterval {
129
+
if retryInterval > c.cfg.MaxRetryInterval {
130
+
retryInterval = c.cfg.MaxRetryInterval
133
+
c.logger.Info("retrying connection", "source", source, "delay", delay)
135
+
case <-time.After(delay):
143
+
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
144
+
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
147
+
u, err := url.Parse(source)
152
+
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
157
+
c.connMap.Store(source, conn)
158
+
defer c.connMap.Delete(source)
160
+
c.logger.Info("connected", "source", source)
167
+
msgType, msg, err := conn.ReadMessage()
171
+
if msgType != websocket.TextMessage {
175
+
case c.jobQueue <- job{source: source, message: msg}: