forked from tangled.org/core
Monorepo for Tangled — https://tangled.org

appview,knotclient: refactor cursor store to separate package

Signed-off-by: oppiliappan <me@oppi.li>

oppi.li 60444e90 5cd3fc39

verified
Changed files
+78 -63
appview
knotclient
nix
+2 -1
appview/state/knotstream.go
···
"tangled.sh/tangled.sh/core/appview/config"
"tangled.sh/tangled.sh/core/appview/db"
kc "tangled.sh/tangled.sh/core/knotclient"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/rbac"
···
logger := log.New("knotstream")
cache := cache.New(c.Redis.Addr)
-
cursorStore := kc.NewRedisCursorStore(cache)
cfg := kc.ConsumerConfig{
Sources: srcs,
···
"tangled.sh/tangled.sh/core/appview/config"
"tangled.sh/tangled.sh/core/appview/db"
kc "tangled.sh/tangled.sh/core/knotclient"
+
"tangled.sh/tangled.sh/core/knotclient/cursor"
"tangled.sh/tangled.sh/core/log"
"tangled.sh/tangled.sh/core/rbac"
···
logger := log.New("knotstream")
cache := cache.New(c.Redis.Addr)
+
cursorStore := cursor.NewRedisCursorStore(cache)
cfg := kc.ConsumerConfig{
Sources: srcs,
+23
knotclient/cursor/memory.go
···
···
+
package cursor
+
+
import (
+
"sync"
+
)
+
+
type MemoryStore struct {
+
store sync.Map
+
}
+
+
func (m *MemoryStore) Set(knot string, cursor int64) {
+
m.store.Store(knot, cursor)
+
}
+
+
func (m *MemoryStore) Get(knot string) (cursor int64) {
+
if result, ok := m.store.Load(knot); ok {
+
if val, ok := result.(int64); ok {
+
return val
+
}
+
}
+
+
return 0
+
}
+43
knotclient/cursor/redis.go
···
···
+
package cursor
+
+
import (
+
"context"
+
"fmt"
+
"strconv"
+
+
"tangled.sh/tangled.sh/core/appview/cache"
+
)
+
+
const (
+
cursorKey = "cursor:%s"
+
)
+
+
type RedisStore struct {
+
rdb *cache.Cache
+
}
+
+
func NewRedisCursorStore(cache *cache.Cache) RedisStore {
+
return RedisStore{
+
rdb: cache,
+
}
+
}
+
+
func (r *RedisStore) Set(knot string, cursor int64) {
+
key := fmt.Sprintf(cursorKey, knot)
+
r.rdb.Set(context.Background(), key, cursor, 0)
+
}
+
+
func (r *RedisStore) Get(knot string) (cursor int64) {
+
key := fmt.Sprintf(cursorKey, knot)
+
val, err := r.rdb.Get(context.Background(), key).Result()
+
if err != nil {
+
return 0
+
}
+
cursor, err = strconv.ParseInt(val, 10, 64)
+
if err != nil {
+
// TODO: log here
+
return 0
+
}
+
+
return cursor
+
}
+6
knotclient/cursor/store.go
···
···
+
package cursor
+
+
type Store interface {
+
Set(knot string, cursor int64)
+
Get(knot string) (cursor int64)
+
}
+3 -61
knotclient/events.go
···
"log/slog"
"math/rand"
"net/url"
-
"strconv"
"sync"
"time"
-
"tangled.sh/tangled.sh/core/appview/cache"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
QueueSize int
Logger *slog.Logger
Dev bool
-
CursorStore CursorStore
}
func NewConsumerConfig() *ConsumerConfig {
···
cfg ConsumerConfig
}
-
type CursorStore interface {
-
Set(knot string, cursor int64)
-
Get(knot string) (cursor int64)
-
}
-
-
type RedisCursorStore struct {
-
rdb *cache.Cache
-
}
-
-
func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
-
return RedisCursorStore{
-
rdb: cache,
-
}
-
}
-
-
const (
-
cursorKey = "cursor:%s"
-
)
-
-
func (r *RedisCursorStore) Set(knot string, cursor int64) {
-
key := fmt.Sprintf(cursorKey, knot)
-
r.rdb.Set(context.Background(), key, cursor, 0)
-
}
-
-
func (r *RedisCursorStore) Get(knot string) (cursor int64) {
-
key := fmt.Sprintf(cursorKey, knot)
-
val, err := r.rdb.Get(context.Background(), key).Result()
-
if err != nil {
-
return 0
-
}
-
-
cursor, err = strconv.ParseInt(val, 10, 64)
-
if err != nil {
-
return 0 // optionally log parsing error
-
}
-
-
return cursor
-
}
-
-
type MemoryCursorStore struct {
-
store sync.Map
-
}
-
-
func (m *MemoryCursorStore) Set(knot string, cursor int64) {
-
m.store.Store(knot, cursor)
-
}
-
-
func (m *MemoryCursorStore) Get(knot string) (cursor int64) {
-
if result, ok := m.store.Load(knot); ok {
-
if val, ok := result.(int64); ok {
-
return val
-
}
-
}
-
-
return 0
-
}
-
func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) {
scheme := "wss"
if e.cfg.Dev {
···
cfg.QueueSize = 100
}
if cfg.CursorStore == nil {
-
cfg.CursorStore = &MemoryCursorStore{}
}
return &EventConsumer{
cfg: cfg,
···
"log/slog"
"math/rand"
"net/url"
"sync"
"time"
+
"tangled.sh/tangled.sh/core/knotclient/cursor"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
QueueSize int
Logger *slog.Logger
Dev bool
+
CursorStore cursor.Store
}
func NewConsumerConfig() *ConsumerConfig {
···
cfg ConsumerConfig
}
func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) {
scheme := "wss"
if e.cfg.Dev {
···
cfg.QueueSize = 100
}
if cfg.CursorStore == nil {
+
cfg.CursorStore = &cursor.MemoryStore{}
}
return &EventConsumer{
cfg: cfg,
+1 -1
nix/vm.nix
···
g = config.services.tangled-knot.gitUser;
in [
"d /var/lib/knot 0770 ${u} ${g} - -" # Create the directory first
-
"f+ /var/lib/knot/secret 0660 ${u} ${g} - KNOT_SERVER_SECRET=16154910ef55fe48121082c0b51fc0e360a8b15eb7bda7991d88dc9f7684427a"
];
services.tangled-knot = {
enable = true;
···
g = config.services.tangled-knot.gitUser;
in [
"d /var/lib/knot 0770 ${u} ${g} - -" # Create the directory first
+
"f+ /var/lib/knot/secret 0660 ${u} ${g} - KNOT_SERVER_SECRET=2650ecafdce279b09865fb1923051156eb773ee7485061b2e766086f07dbd85a"
];
services.tangled-knot = {
enable = true;