forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotclient 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "math/rand" 9 "net/url" 10 "strconv" 11 "sync" 12 "time" 13 14 "tangled.sh/tangled.sh/core/appview/cache" 15 "tangled.sh/tangled.sh/core/log" 16 17 "github.com/gorilla/websocket" 18) 19 20type ProcessFunc func(source EventSource, message Message) error 21 22type Message struct { 23 Rkey string 24 Nsid string 25 // do not full deserialize this portion of the message, processFunc can do that 26 EventJson json.RawMessage `json:"event"` 27} 28 29type ConsumerConfig struct { 30 Sources map[EventSource]struct{} 31 ProcessFunc ProcessFunc 32 RetryInterval time.Duration 33 MaxRetryInterval time.Duration 34 ConnectionTimeout time.Duration 35 WorkerCount int 36 QueueSize int 37 Logger *slog.Logger 38 Dev bool 39 CursorStore CursorStore 40} 41 42type EventSource struct { 43 Knot string 44} 45 46func NewEventSource(knot string) EventSource { 47 return EventSource{ 48 Knot: knot, 49 } 50} 51 52type EventConsumer struct { 53 cfg ConsumerConfig 54 wg sync.WaitGroup 55 dialer *websocket.Dialer 56 connMap sync.Map 57 jobQueue chan job 58 logger *slog.Logger 59 randSource *rand.Rand 60 61 // rw lock over edits to consumer config 62 mu sync.RWMutex 63} 64 65type CursorStore interface { 66 Set(knot string, cursor int64) 67 Get(knot string) (cursor int64) 68} 69 70type RedisCursorStore struct { 71 rdb *cache.Cache 72} 73 74func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore { 75 return RedisCursorStore{ 76 rdb: cache, 77 } 78} 79 80const ( 81 cursorKey = "cursor:%s" 82) 83 84func (r *RedisCursorStore) Set(knot string, cursor int64) { 85 key := fmt.Sprintf(cursorKey, knot) 86 r.rdb.Set(context.Background(), key, cursor, 0) 87} 88 89func (r *RedisCursorStore) Get(knot string) (cursor int64) { 90 key := fmt.Sprintf(cursorKey, knot) 91 val, err := r.rdb.Get(context.Background(), key).Result() 92 if err != nil { 93 return 0 94 } 95 96 cursor, err = strconv.ParseInt(val, 10, 64) 97 if err != nil { 98 return 0 // optionally log parsing error 99 } 100 101 return cursor 102} 103 104type MemoryCursorStore struct { 105 store sync.Map 106} 107 108func (m *MemoryCursorStore) Set(knot string, cursor int64) { 109 m.store.Store(knot, cursor) 110} 111 112func (m *MemoryCursorStore) Get(knot string) (cursor int64) { 113 if result, ok := m.store.Load(knot); ok { 114 if val, ok := result.(int64); ok { 115 return val 116 } 117 } 118 119 return 0 120} 121 122func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) { 123 scheme := "wss" 124 if e.cfg.Dev { 125 scheme = "ws" 126 } 127 128 u, err := url.Parse(scheme + "://" + s.Knot + "/events") 129 if err != nil { 130 return nil, err 131 } 132 133 if cursor != 0 { 134 query := url.Values{} 135 query.Add("cursor", fmt.Sprintf("%d", cursor)) 136 u.RawQuery = query.Encode() 137 } 138 return u, nil 139} 140 141type job struct { 142 source EventSource 143 message []byte 144} 145 146func NewEventConsumer(cfg ConsumerConfig) *EventConsumer { 147 if cfg.RetryInterval == 0 { 148 cfg.RetryInterval = 15 * time.Minute 149 } 150 if cfg.ConnectionTimeout == 0 { 151 cfg.ConnectionTimeout = 10 * time.Second 152 } 153 if cfg.WorkerCount <= 0 { 154 cfg.WorkerCount = 5 155 } 156 if cfg.MaxRetryInterval == 0 { 157 cfg.MaxRetryInterval = 1 * time.Hour 158 } 159 if cfg.Logger == nil { 160 cfg.Logger = log.New("eventconsumer") 161 } 162 if cfg.QueueSize == 0 { 163 cfg.QueueSize = 100 164 } 165 if cfg.CursorStore == nil { 166 cfg.CursorStore = &MemoryCursorStore{} 167 } 168 return &EventConsumer{ 169 cfg: cfg, 170 dialer: websocket.DefaultDialer, 171 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue 172 logger: cfg.Logger, 173 randSource: rand.New(rand.NewSource(time.Now().UnixNano())), 174 } 175} 176 177func (c *EventConsumer) Start(ctx context.Context) { 178 c.cfg.Logger.Info("starting consumer", "config", c.cfg) 179 180 // start workers 181 for range c.cfg.WorkerCount { 182 c.wg.Add(1) 183 go c.worker(ctx) 184 } 185 186 // start streaming 187 for source := range c.cfg.Sources { 188 c.wg.Add(1) 189 go c.startConnectionLoop(ctx, source) 190 } 191} 192 193func (c *EventConsumer) Stop() { 194 c.connMap.Range(func(_, val any) bool { 195 if conn, ok := val.(*websocket.Conn); ok { 196 conn.Close() 197 } 198 return true 199 }) 200 c.wg.Wait() 201 close(c.jobQueue) 202} 203 204func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) { 205 c.mu.Lock() 206 c.cfg.Sources[s] = struct{}{} 207 c.wg.Add(1) 208 go c.startConnectionLoop(ctx, s) 209 c.mu.Unlock() 210} 211 212func (c *EventConsumer) worker(ctx context.Context) { 213 defer c.wg.Done() 214 for { 215 select { 216 case <-ctx.Done(): 217 return 218 case j, ok := <-c.jobQueue: 219 if !ok { 220 return 221 } 222 223 var msg Message 224 err := json.Unmarshal(j.message, &msg) 225 if err != nil { 226 c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err) 227 return 228 } 229 230 // update cursor 231 c.cfg.CursorStore.Set(j.source.Knot, time.Now().Unix()) 232 233 if err := c.cfg.ProcessFunc(j.source, msg); err != nil { 234 c.logger.Error("error processing message", "source", j.source, "err", err) 235 } 236 } 237 } 238} 239 240func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) { 241 defer c.wg.Done() 242 retryInterval := c.cfg.RetryInterval 243 for { 244 select { 245 case <-ctx.Done(): 246 return 247 default: 248 err := c.runConnection(ctx, source) 249 if err != nil { 250 c.logger.Error("connection failed", "source", source, "err", err) 251 } 252 253 // apply jitter 254 jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5)) 255 delay := retryInterval + jitter 256 257 if retryInterval < c.cfg.MaxRetryInterval { 258 retryInterval *= 2 259 if retryInterval > c.cfg.MaxRetryInterval { 260 retryInterval = c.cfg.MaxRetryInterval 261 } 262 } 263 c.logger.Info("retrying connection", "source", source, "delay", delay) 264 select { 265 case <-time.After(delay): 266 case <-ctx.Done(): 267 return 268 } 269 } 270 } 271} 272 273func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error { 274 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 275 defer cancel() 276 277 cursor := c.cfg.CursorStore.Get(source.Knot) 278 279 u, err := c.buildUrl(source, cursor) 280 if err != nil { 281 return err 282 } 283 284 c.logger.Info("connecting", "url", u.String()) 285 conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) 286 if err != nil { 287 return err 288 } 289 defer conn.Close() 290 c.connMap.Store(source, conn) 291 defer c.connMap.Delete(source) 292 293 c.logger.Info("connected", "source", source) 294 295 for { 296 select { 297 case <-ctx.Done(): 298 return nil 299 default: 300 msgType, msg, err := conn.ReadMessage() 301 if err != nil { 302 return err 303 } 304 if msgType != websocket.TextMessage { 305 continue 306 } 307 select { 308 case c.jobQueue <- job{source: source, message: msg}: 309 case <-ctx.Done(): 310 return nil 311 } 312 } 313 } 314}