knotclient: introduce CursorStore #235

merged
opened by oppi.li targeting master from push-qoplqnlvlqqo

consumers can configure a cursor-store, where cursors of individual event sources are stored. the module provides an in-memory store and a redis-backed store.

Signed-off-by: oppiliappan me@oppi.li

Changed files
+77 -9
cmd
eventconsumer
knotclient
+11 -7
cmd/eventconsumer/main.go
···
)
func main() {
-
sourcesFlag := flag.String("sources", "", "list of wss sources")
+
knots := flag.String("knots", "", "list of knots to connect to")
retryFlag := flag.Duration("retry", 1*time.Minute, "retry interval")
maxRetryFlag := flag.Duration("max-retry", 30*time.Minute, "max retry interval")
workerCount := flag.Int("workers", 10, "goroutine pool size")
flag.Parse()
-
if *sourcesFlag == "" {
-
fmt.Println("error: -sources is required")
+
if *knots == "" {
+
fmt.Println("error: -knots is required")
flag.Usage()
return
}
-
sources := strings.Split(*sourcesFlag, ",")
+
var srcs []knotclient.EventSource
+
for k := range strings.SplitSeq(*knots, ",") {
+
srcs = append(srcs, knotclient.EventSource{k})
+
}
consumer := knotclient.NewEventConsumer(knotclient.ConsumerConfig{
-
Sources: sources,
+
Sources: srcs,
ProcessFunc: processEvent,
RetryInterval: *retryFlag,
MaxRetryInterval: *maxRetryFlag,
WorkerCount: *workerCount,
+
Dev: true,
})
ctx, cancel := context.WithCancel(context.Background())
···
consumer.Stop()
}
-
func processEvent(source string, msg []byte) error {
-
fmt.Printf("From %s: %s\n", source, string(msg))
+
func processEvent(source knotclient.EventSource, msg knotclient.Message) error {
+
fmt.Printf("From %s (%s, %s): %s\n", source.Knot, msg.Rkey, msg.Nsid, string(msg.EventJson))
return nil
}
+66 -2
knotclient/events.go
···
import (
"context"
"encoding/json"
+
"fmt"
"log/slog"
"math/rand"
"net/url"
"sync"
"time"
+
"tangled.sh/tangled.sh/core/appview/cache"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
Rkey string
Nsid string
// do not full deserialize this portion of the message, processFunc can do that
-
EventJson json.RawMessage
+
EventJson json.RawMessage `json:"event"`
}
type ConsumerConfig struct {
···
QueueSize int
Logger *slog.Logger
Dev bool
+
CursorStore CursorStore
}
type EventSource struct {
···
mu sync.RWMutex
}
+
type CursorStore interface {
+
Set(knot, cursor string)
+
Get(knot string) (cursor string)
+
}
+
+
type RedisCursorStore struct {
+
rdb *cache.Cache
+
}
+
+
func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
+
return RedisCursorStore{
+
rdb: cache,
+
}
+
}
+
+
const (
+
cursorKey = "cursor:%s"
+
)
+
+
func (r *RedisCursorStore) Set(knot, cursor string) {
+
key := fmt.Sprintf(cursorKey, knot)
+
r.rdb.Set(context.Background(), key, cursor, 0)
+
}
+
+
func (r *RedisCursorStore) Get(knot string) (cursor string) {
+
key := fmt.Sprintf(cursorKey, knot)
+
val, err := r.rdb.Get(context.Background(), key).Result()
+
if err != nil {
+
return ""
+
}
+
+
return val
+
}
+
+
type MemoryCursorStore struct {
+
store sync.Map
+
}
+
+
func (m *MemoryCursorStore) Set(knot, cursor string) {
+
m.store.Store(knot, cursor)
+
}
+
+
func (m *MemoryCursorStore) Get(knot string) (cursor string) {
+
if result, ok := m.store.Load(knot); ok {
+
if val, ok := result.(string); ok {
+
return val
+
}
+
}
+
+
return ""
+
}
+
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
scheme := "wss"
if e.cfg.Dev {
···
if cfg.QueueSize == 0 {
cfg.QueueSize = 100
}
+
if cfg.CursorStore == nil {
+
cfg.CursorStore = &MemoryCursorStore{}
+
}
return &EventConsumer{
cfg: cfg,
dialer: websocket.DefaultDialer,
···
}
func (c *EventConsumer) Start(ctx context.Context) {
+
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
+
// start workers
for range c.cfg.WorkerCount {
c.wg.Add(1)
···
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
return
}
+
+
// update cursor
+
c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey)
+
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
}
···
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
defer cancel()
-
u, err := url.Parse(source)
+
cursor := c.cfg.CursorStore.Get(source.Knot)
u, err := c.buildUrl(source, cursor)
if err != nil {