···
13
+
"tangled.sh/tangled.sh/core/appview/cache"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
// do not full deserialize this portion of the message, processFunc can do that
23
-
EventJson json.RawMessage
25
+
EventJson json.RawMessage `json:"event"`
type ConsumerConfig struct {
···
38
+
CursorStore CursorStore
type EventSource struct {
···
64
+
type CursorStore interface {
65
+
Set(knot, cursor string)
66
+
Get(knot string) (cursor string)
69
+
type RedisCursorStore struct {
73
+
func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
74
+
return RedisCursorStore{
80
+
cursorKey = "cursor:%s"
83
+
func (r *RedisCursorStore) Set(knot, cursor string) {
84
+
key := fmt.Sprintf(cursorKey, knot)
85
+
r.rdb.Set(context.Background(), key, cursor, 0)
88
+
func (r *RedisCursorStore) Get(knot string) (cursor string) {
89
+
key := fmt.Sprintf(cursorKey, knot)
90
+
val, err := r.rdb.Get(context.Background(), key).Result()
98
+
type MemoryCursorStore struct {
102
+
func (m *MemoryCursorStore) Set(knot, cursor string) {
103
+
m.store.Store(knot, cursor)
106
+
func (m *MemoryCursorStore) Get(knot string) (cursor string) {
107
+
if result, ok := m.store.Load(knot); ok {
108
+
if val, ok := result.(string); ok {
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
···
159
+
if cfg.CursorStore == nil {
160
+
cfg.CursorStore = &MemoryCursorStore{}
dialer: websocket.DefaultDialer,
···
func (c *EventConsumer) Start(ctx context.Context) {
172
+
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
for range c.cfg.WorkerCount {
···
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
225
+
c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey)
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
···
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
207
-
u, err := url.Parse(source)
271
+
cursor := c.cfg.CursorStore.Get(source.Knot)
u, err := c.buildUrl(source, cursor)