···
11
+
"tangled.sh/tangled.sh/core/log"
13
+
"github.com/gorilla/websocket"
16
+
type ProcessFunc func(source string, message []byte) error
18
+
type ConsumerConfig struct {
20
+
ProcessFunc ProcessFunc
21
+
RetryInterval time.Duration
22
+
MaxRetryInterval time.Duration
23
+
ConnectionTimeout time.Duration
29
+
type EventConsumer struct {
32
+
dialer *websocket.Dialer
36
+
randSource *rand.Rand
44
+
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
45
+
if cfg.RetryInterval == 0 {
46
+
cfg.RetryInterval = 15 * time.Minute
48
+
if cfg.ConnectionTimeout == 0 {
49
+
cfg.ConnectionTimeout = 10 * time.Second
51
+
if cfg.WorkerCount <= 0 {
54
+
if cfg.MaxRetryInterval == 0 {
55
+
cfg.MaxRetryInterval = 1 * time.Hour
57
+
if cfg.Logger == nil {
58
+
cfg.Logger = log.New("eventconsumer")
60
+
if cfg.QueueSize == 0 {
63
+
return &EventConsumer{
65
+
dialer: websocket.DefaultDialer,
66
+
jobQueue: make(chan job, cfg.QueueSize), // buffered job queue
68
+
randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
72
+
func (c *EventConsumer) Start(ctx context.Context) {
74
+
for range c.cfg.WorkerCount {
80
+
for _, source := range c.cfg.Sources {
82
+
go c.startConnectionLoop(ctx, source)
86
+
func (c *EventConsumer) Stop() {
87
+
c.connMap.Range(func(_, val any) bool {
88
+
if conn, ok := val.(*websocket.Conn); ok {
97
+
func (c *EventConsumer) worker(ctx context.Context) {
103
+
case j, ok := <-c.jobQueue:
107
+
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
108
+
c.logger.Error("error processing message", "source", j.source, "err", err)
114
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
116
+
retryInterval := c.cfg.RetryInterval
122
+
err := c.runConnection(ctx, source)
124
+
c.logger.Error("connection failed", "source", source, "err", err)
128
+
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
129
+
delay := retryInterval + jitter
131
+
if retryInterval < c.cfg.MaxRetryInterval {
133
+
if retryInterval > c.cfg.MaxRetryInterval {
134
+
retryInterval = c.cfg.MaxRetryInterval
137
+
c.logger.Info("retrying connection", "source", source, "delay", delay)
139
+
case <-time.After(delay):
147
+
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
148
+
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
151
+
u, err := url.Parse(source)
156
+
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
161
+
c.connMap.Store(source, conn)
162
+
defer c.connMap.Delete(source)
164
+
c.logger.Info("connected", "source", source)
171
+
msgType, msg, err := conn.ReadMessage()
175
+
if msgType != websocket.TextMessage {
179
+
case c.jobQueue <- job{source: source, message: msg}: