knotclient: introduce event consumer #226

merged
opened by oppi.li targeting master from push-lrzzmtxokrxw

generic websocket consumer for one or more event streams from knots

Signed-off-by: oppiliappan me@oppi.li

Changed files
+228
cmd
eventconsumer
knotclient
+47
cmd/eventconsumer/main.go
···
+
package main
+
+
import (
+
"context"
+
"flag"
+
"fmt"
+
"strings"
+
"time"
+
+
"tangled.sh/tangled.sh/core/knotclient"
+
)
+
+
func main() {
+
sourcesFlag := flag.String("sources", "", "list of wss sources")
+
retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
+
maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
+
workerCount := flag.Int("workers", 10, "goroutine pool size")
+
+
flag.Parse()
+
+
if *sourcesFlag == "" {
+
fmt.Println("error: -sources is required")
+
flag.Usage()
+
return
+
}
+
+
sources := strings.Split(*sourcesFlag, ",")
+
+
consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
+
Sources: sources,
+
ProcessFunc: processEvent,
+
RetryInterval: *retryFlag,
+
MaxRetryInterval: *maxRetryFlag,
+
WorkerCount: *workerCount,
+
})
+
+
ctx, cancel := context.WithCancel(context.Background())
+
consumer.Start(ctx)
+
time.Sleep(1 * time.Hour)
+
cancel()
+
consumer.Stop()
+
}
+
+
func processEvent(source string, msg []byte) error {
+
fmt.Printf("From %s: %s\n", source, string(msg))
+
return nil
+
}
+181
knotclient/events.go
···
+
package knotclient
+
+
import (
+
"context"
+
"log/slog"
+
"math/rand"
+
"net/url"
+
"sync"
+
"time"
+
+
"tangled.sh/tangled.sh/core/log"
+
+
"github.com/gorilla/websocket"
+
)
+
+
type ProcessFunc func(source string, message []byte) error
+
+
type ConsumerConfig struct {
+
Sources []string
+
ProcessFunc ProcessFunc
+
RetryInterval time.Duration
+
MaxRetryInterval time.Duration
+
ConnectionTimeout time.Duration
+
WorkerCount int
+
Logger *slog.Logger
+
}
+
+
type EventConsumer struct {
+
cfg ConsumerConfig
+
wg sync.WaitGroup
+
dialer *websocket.Dialer
+
connMap sync.Map
+
jobQueue chan job
+
logger *slog.Logger
+
randSource *rand.Rand
+
}
+
+
type job struct {
+
source string
+
message []byte
+
}
+
+
func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
+
if cfg.RetryInterval == 0 {
+
cfg.RetryInterval = 15 * time.Minute
+
}
+
if cfg.ConnectionTimeout == 0 {
+
cfg.ConnectionTimeout = 10 * time.Second
+
}
+
if cfg.WorkerCount <= 0 {
+
cfg.WorkerCount = 5
+
}
+
if cfg.MaxRetryInterval == 0 {
+
cfg.MaxRetryInterval = 1 * time.Hour
+
}
+
if cfg.Logger == nil {
+
cfg.Logger = log.New("eventconsumer")
+
}
+
return &EventConsumer{
+
cfg: cfg,
+
dialer: websocket.DefaultDialer,
+
jobQueue: make(chan job, 100), // buffered job queue
+
logger: cfg.Logger,
+
randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
+
}
+
}
+
+
func (c *EventConsumer) Start(ctx context.Context) {
+
// start workers
+
for range c.cfg.WorkerCount {
+
c.wg.Add(1)
+
go c.worker(ctx)
+
}
+
+
// start streaming
+
for _, source := range c.cfg.Sources {
+
c.wg.Add(1)
+
go c.startConnectionLoop(ctx, source)
+
}
+
}
+
+
func (c *EventConsumer) Stop() {
+
c.connMap.Range(func(_, val any) bool {
+
if conn, ok := val.(*websocket.Conn); ok {
+
conn.Close()
+
}
+
return true
+
})
+
c.wg.Wait()
+
close(c.jobQueue)
+
}
+
+
func (c *EventConsumer) worker(ctx context.Context) {
+
defer c.wg.Done()
+
for {
+
select {
+
case <-ctx.Done():
+
return
+
case j, ok := <-c.jobQueue:
+
if !ok {
+
return
+
}
+
if err := c.cfg.ProcessFunc(j.source, j.message); err != nil {
+
c.logger.Error("error processing message", "source", j.source, "err", err)
+
}
+
}
+
}
+
}
+
+
func (c *EventConsumer) startConnectionLoop(ctx context.Context, source string) {
+
defer c.wg.Done()
+
retryInterval := c.cfg.RetryInterval
+
for {
+
select {
+
case <-ctx.Done():
+
return
+
default:
+
err := c.runConnection(ctx, source)
+
if err != nil {
+
c.logger.Error("connection failed", "source", source, "err", err)
+
}
+
+
// apply jitter
+
jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
+
delay := retryInterval + jitter
+
+
if retryInterval < c.cfg.MaxRetryInterval {
+
retryInterval *= 2
+
if retryInterval > c.cfg.MaxRetryInterval {
+
retryInterval = c.cfg.MaxRetryInterval
+
}
+
}
+
c.logger.Info("retrying connection", "source", source, "delay", delay)
+
select {
+
case <-time.After(delay):
+
case <-ctx.Done():
+
return
+
}
+
}
+
}
+
}
+
+
func (c *EventConsumer) runConnection(ctx context.Context, source string) error {
+
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
+
defer cancel()
+
+
u, err := url.Parse(source)
+
if err != nil {
+
return err
+
}
+
+
conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
+
if err != nil {
+
return err
+
}
+
defer conn.Close()
+
c.connMap.Store(source, conn)
+
defer c.connMap.Delete(source)
+
+
c.logger.Info("connected", "source", source)
+
+
for {
+
select {
+
case <-ctx.Done():
+
return nil
+
default:
+
msgType, msg, err := conn.ReadMessage()
+
if err != nil {
+
return err
+
}
+
if msgType != websocket.TextMessage {
+
continue
+
}
+
select {
+
case c.jobQueue <- job{source: source, message: msg}:
+
case <-ctx.Done():
+
return nil
+
}
+
}
+
}
+
}