···
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
// do not full deserialize this portion of the message, processFunc can do that
-
EventJson json.RawMessage
type ConsumerConfig struct {
···
type EventSource struct {
···
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
···
dialer: websocket.DefaultDialer,
···
func (c *EventConsumer) Start(ctx context.Context) {
for range c.cfg.WorkerCount {
···
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
···
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
-
u, err := url.Parse(source)
u, err := c.buildUrl(source, cursor)
···
+
"tangled.sh/tangled.sh/core/appview/cache"
"tangled.sh/tangled.sh/core/log"
"github.com/gorilla/websocket"
···
// do not full deserialize this portion of the message, processFunc can do that
+
EventJson json.RawMessage `json:"event"`
type ConsumerConfig struct {
···
+
CursorStore CursorStore
type EventSource struct {
···
+
type CursorStore interface {
+
Set(knot, cursor string)
+
Get(knot string) (cursor string)
+
type RedisCursorStore struct {
+
func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
+
return RedisCursorStore{
+
cursorKey = "cursor:%s"
+
func (r *RedisCursorStore) Set(knot, cursor string) {
+
key := fmt.Sprintf(cursorKey, knot)
+
r.rdb.Set(context.Background(), key, cursor, 0)
+
func (r *RedisCursorStore) Get(knot string) (cursor string) {
+
key := fmt.Sprintf(cursorKey, knot)
+
val, err := r.rdb.Get(context.Background(), key).Result()
+
type MemoryCursorStore struct {
+
func (m *MemoryCursorStore) Set(knot, cursor string) {
+
m.store.Store(knot, cursor)
+
func (m *MemoryCursorStore) Get(knot string) (cursor string) {
+
if result, ok := m.store.Load(knot); ok {
+
if val, ok := result.(string); ok {
func (e *EventConsumer) buildUrl(s EventSource, cursor string) (*url.URL, error) {
···
+
if cfg.CursorStore == nil {
+
cfg.CursorStore = &MemoryCursorStore{}
dialer: websocket.DefaultDialer,
···
func (c *EventConsumer) Start(ctx context.Context) {
+
c.cfg.Logger.Info("starting consumer", "config", c.cfg)
for range c.cfg.WorkerCount {
···
c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
+
c.cfg.CursorStore.Set(j.source.Knot, msg.Rkey)
if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
c.logger.Error("error processing message", "source", j.source, "err", err)
···
connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
+
cursor := c.cfg.CursorStore.Get(source.Knot)
u, err := c.buildUrl(source, cursor)