···
13
+
"tangled.sh/tangled.sh/core/appview/cache"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
// do not full deserialize this portion of the message, processFunc can do that
23
-
EventJson json.RawMessage
25
+
EventJson json.RawMessage `json:"event"`
type ConsumerConfig struct {
···
38
+
CursorStore CursorStore
type EventSource struct {
···
61
+
type CursorStore interface {
62
+
Set(knot, cursor string)
63
+
Get(knot string) (cursor string)
66
+
type RedisCursorStore struct {
70
+
func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
71
+
return RedisCursorStore{
77
+
cursorKey = "cursor:%s"
80
+
func (r *RedisCursorStore) Set(knot, cursor string) {
81
+
key := fmt.Sprintf(cursorKey, knot)
82
+
r.rdb.Set(context.Background(), key, cursor, 0)
85
+
func (r *RedisCursorStore) Get(knot string) (cursor string) {
86
+
key := fmt.Sprintf(cursorKey, knot)
87
+
val, err := r.rdb.Get(context.Background(), key).Result()
95
+
type MemoryCursorStore struct {
99
+
func (m *MemoryCursorStore) Set(knot, cursor string) {
100
+
m.store.Store(knot, cursor)
103
+
func (m *MemoryCursorStore) Get(knot string) (cursor string) {
104
+
if result, ok := m.store.Load(knot); ok {
105
+
if val, ok := result.(string); ok {
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
···
156
+
if cfg.CursorStore == nil {
157
+
cfg.CursorStore = &MemoryCursorStore{}
dialer: websocket.DefaultDialer,
···
func (c *EventConsumer) Start(ctx context.Context) {
169
+
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
for range c.cfg.WorkerCount {
···
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
214
+
c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey)
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
···
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
196
-
u, err := url.Parse(source)
260
+
cursor := c.cfg.CursorStore.Get(source.Knot)
u, err := c.buildUrl(source, cursor)