From aee5c33094b978a67c84ecc13e9606736119715c Mon Sep 17 00:00:00 2001 From: oppiliappan Date: Mon, 9 Jun 2025 15:01:55 +0100 Subject: [PATCH] knotclient: introduce CursorStore Change-Id: korrmpolxvurpyspyywouzopkyvxmmkn consumers can configure a cursor-store, where cursors of individual event sources are stored. the module provides an in-memory store and a redis-backed store. Signed-off-by: oppiliappan --- cmd/eventconsumer/main.go | 18 +++++++---- knotclient/events.go | 68 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/cmd/eventconsumer/main.go b/cmd/eventconsumer/main.go index 45f9a8f..1e62554 100644 --- a/cmd/eventconsumer/main.go +++ b/cmd/eventconsumer/main.go @@ -11,27 +11,31 @@ import ( ) func main() { - sourcesFlag := flag.String("sources", "", "list of wss sources") + knots := flag.String("knots", "", "list of knots to connect to") retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval") maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval") workerCount := flag.Int("workers", 10, "goroutine pool size") flag.Parse() - if *sourcesFlag == "" { - fmt.Println("error: -sources is required") + if *knots == "" { + fmt.Println("error: -knots is required") flag.Usage() return } - sources := strings.Split(*sourcesFlag, ",") + var srcs []knotclient.EventSource + for k := range strings.SplitSeq(*knots, ",") { + srcs = append(srcs, knotclient.EventSource{k}) + } consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{ - Sources: sources, + Sources: srcs, ProcessFunc: processEvent, RetryInterval: *retryFlag, MaxRetryInterval: *maxRetryFlag, WorkerCount: *workerCount, + Dev: true, }) ctx, cancel := context.WithCancel(context.Background()) @@ -41,7 +45,7 @@ func main() { consumer.Stop() } -func processEvent(source string, msg []byte) error { - fmt.Printf("From %s: %s\n", source, string(msg)) +func processEvent(source knotclient.EventSource, msg knotclient.Message) error { + fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson)) return nil } diff --git a/knotclient/events.go b/knotclient/events.go index f4c1fa8..e23116e 100644 --- a/knotclient/events.go +++ b/knotclient/events.go @@ -3,12 +3,14 @@ package knotclient import ( "context" "encoding/json" + "fmt" "log/slog" "math/rand" "net/url" "sync" "time" + "tangled.sh/tangled.sh/core/appview/cache" "tangled.sh/tangled.sh/core/log" "github.com/gorilla/websocket" @@ -20,7 +22,7 @@ type Message struct { Rkey string Nsid string // do not full deserialize this portion of the message, processFunc can do that - EventJson json.RawMessage + EventJson json.RawMessage `json:"event"` } type ConsumerConfig struct { @@ -33,6 +35,7 @@ type ConsumerConfig struct { QueueSize int Logger *slog.Logger Dev bool + CursorStore CursorStore } type EventSource struct { @@ -58,6 +61,58 @@ type EventConsumer struct { mu sync.RWMutex } +type CursorStore interface { + Set(knot, cursor string) + Get(knot string) (cursor string) +} + +type RedisCursorStore struct { + rdb *cache.Cache +} + +func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore { + return RedisCursorStore{ + rdb: cache, + } +} + +const ( + cursorKey = "cursor:%s" +) + +func (r *RedisCursorStore) Set(knot, cursor string) { + key := fmt.Sprintf(cursorKey, knot) + r.rdb.Set(context.Background(), key, cursor, 0) +} + +func (r *RedisCursorStore) Get(knot string) (cursor string) { + key := fmt.Sprintf(cursorKey, knot) + val, err := r.rdb.Get(context.Background(), key).Result() + if err != nil { + return "" + } + + return val +} + +type MemoryCursorStore struct { + store sync.Map +} + +func (m *MemoryCursorStore) Set(knot, cursor string) { + m.store.Store(knot, cursor) +} + +func (m *MemoryCursorStore) Get(knot string) (cursor string) { + if result, ok := m.store.Load(knot); ok { + if val, ok := result.(string); ok { + return val + } + } + + return "" +} + func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) { scheme := "wss" if e.cfg.Dev { @@ -101,6 +156,9 @@ func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { if cfg.QueueSize == 0 { cfg.QueueSize = 100 } + if cfg.CursorStore == nil { + cfg.CursorStore = &MemoryCursorStore{} + } return &EventConsumer{ cfg: cfg, dialer: websocket.DefaultDialer, @@ -111,6 +169,8 @@ func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { } func (c *EventConsumer) Start(ctx context.Context) { + c.cfg.Logger.Info("starting consumer", "config", c.cfg) + // start workers for range c.cfg.WorkerCount { c.wg.Add(1) @@ -160,6 +220,10 @@ func (c *EventConsumer) worker(ctx context.Context) { c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) return } + + // update cursor + c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey) + if err := c.cfg.ProcessFunc(j.source, msg); err != nil { c.logger.Error("error processing message", "source", j.source, "err", err) } @@ -204,7 +268,7 @@ func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) e connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) defer cancel() - u, err := url.Parse(source) + cursor := c.cfg.CursorStore.Get(source.Knot) u, err := c.buildUrl(source, cursor) if err != nil { -- 2.43.0