···
+
"tangled.sh/tangled.sh/core/log"
+
"github.com/gorilla/websocket"
+
type ProcessFunc func(source string, message []byte) error
+
type ConsumerConfig struct {
+
ProcessFunc ProcessFunc
+
RetryInterval time.Duration
+
MaxRetryInterval time.Duration
+
ConnectionTimeout time.Duration
+
type EventConsumer struct {
+
cancel context.CancelFunc
+
dialer *websocket.Dialer
+
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
+
if cfg.RetryInterval == 0 {
+
cfg.RetryInterval = 15 * time.Minute
+
if cfg.ConnectionTimeout == 0 {
+
cfg.ConnectionTimeout = 10 * time.Second
+
if cfg.WorkerCount <= 0 {
+
if cfg.MaxRetryInterval == 0 {
+
cfg.MaxRetryInterval = 1 * time.Hour
+
cfg.Logger = log.New("eventconsumer")
+
ctx, cancel := context.WithCancel(context.Background())
+
dialer: websocket.DefaultDialer,
+
jobQueue: make(chan job, 100), // buffered job queue
+
randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
+
func (c *EventConsumer) Start() {
+
for range c.cfg.WorkerCount {
+
for _, source := range c.cfg.Sources {
+
go c.startConnectionLoop(source)
+
func (c *EventConsumer) Stop() {
+
c.connMap.Range(func(_, val any) bool {
+
if conn, ok := val.(*websocket.Conn); ok {
+
func (c *EventConsumer) worker() {
+
case j, ok := <-c.jobQueue:
+
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
+
c.logger.Error("error processing message", "source", j.source, "err", err)
+
func (c *EventConsumer) startConnectionLoop(source string) {
+
retryInterval := c.cfg.RetryInterval
+
err := c.runConnection(source)
+
c.logger.Error("connection failed", "source", source, "err", err)
+
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
+
delay := retryInterval + jitter
+
if retryInterval < c.cfg.MaxRetryInterval {
+
if retryInterval > c.cfg.MaxRetryInterval {
+
retryInterval = c.cfg.MaxRetryInterval
+
c.logger.Info("retrying connection", "source", source, "delay", delay)
+
case <-time.After(delay):
+
func (c *EventConsumer) runConnection(source string) error {
+
ctx, cancel := context.WithTimeout(c.ctx, c.cfg.ConnectionTimeout)
+
u, err := url.Parse(source)
+
conn, _, err := c.dialer.DialContext(ctx, u.String(), nil)
+
c.connMap.Store(source, conn)
+
defer c.connMap.Delete(source)
+
c.logger.Info("connected", "source", source)
+
msgType, msg, err := conn.ReadMessage()
+
if msgType != websocket.TextMessage {
+
case c.jobQueue <- job{source: source, message: msg}: