forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotclient
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "math/rand"
9 "net/url"
10 "strconv"
11 "sync"
12 "time"
13
14 "tangled.sh/tangled.sh/core/appview/cache"
15 "tangled.sh/tangled.sh/core/log"
16
17 "github.com/gorilla/websocket"
18)
19
20type ProcessFunc func(source EventSource, message Message) error
21
22type Message struct {
23 Rkey string
24 Nsid string
25 // do not full deserialize this portion of the message, processFunc can do that
26 EventJson json.RawMessage `json:"event"`
27}
28
29type ConsumerConfig struct {
30 Sources map[EventSource]struct{}
31 ProcessFunc ProcessFunc
32 RetryInterval time.Duration
33 MaxRetryInterval time.Duration
34 ConnectionTimeout time.Duration
35 WorkerCount int
36 QueueSize int
37 Logger *slog.Logger
38 Dev bool
39 CursorStore CursorStore
40}
41
42type EventSource struct {
43 Knot string
44}
45
46func NewEventSource(knot string) EventSource {
47 return EventSource{
48 Knot: knot,
49 }
50}
51
52type EventConsumer struct {
53 cfg ConsumerConfig
54 wg sync.WaitGroup
55 dialer *websocket.Dialer
56 connMap sync.Map
57 jobQueue chan job
58 logger *slog.Logger
59 randSource *rand.Rand
60
61 // rw lock over edits to consumer config
62 mu sync.RWMutex
63}
64
65type CursorStore interface {
66 Set(knot string, cursor int64)
67 Get(knot string) (cursor int64)
68}
69
70type RedisCursorStore struct {
71 rdb *cache.Cache
72}
73
74func NewRedisCursorStore(cache *cache.Cache) RedisCursorStore {
75 return RedisCursorStore{
76 rdb: cache,
77 }
78}
79
80const (
81 cursorKey = "cursor:%s"
82)
83
84func (r *RedisCursorStore) Set(knot string, cursor int64) {
85 key := fmt.Sprintf(cursorKey, knot)
86 r.rdb.Set(context.Background(), key, cursor, 0)
87}
88
89func (r *RedisCursorStore) Get(knot string) (cursor int64) {
90 key := fmt.Sprintf(cursorKey, knot)
91 val, err := r.rdb.Get(context.Background(), key).Result()
92 if err != nil {
93 return 0
94 }
95
96 cursor, err = strconv.ParseInt(val, 10, 64)
97 if err != nil {
98 return 0 // optionally log parsing error
99 }
100
101 return cursor
102}
103
104type MemoryCursorStore struct {
105 store sync.Map
106}
107
108func (m *MemoryCursorStore) Set(knot string, cursor int64) {
109 m.store.Store(knot, cursor)
110}
111
112func (m *MemoryCursorStore) Get(knot string) (cursor int64) {
113 if result, ok := m.store.Load(knot); ok {
114 if val, ok := result.(int64); ok {
115 return val
116 }
117 }
118
119 return 0
120}
121
122func (e *EventConsumer) buildUrl(s EventSource, cursor int64) (*url.URL, error) {
123 scheme := "wss"
124 if e.cfg.Dev {
125 scheme = "ws"
126 }
127
128 u, err := url.Parse(scheme + "://" + s.Knot + "/events")
129 if err != nil {
130 return nil, err
131 }
132
133 if cursor != 0 {
134 query := url.Values{}
135 query.Add("cursor", fmt.Sprintf("%d", cursor))
136 u.RawQuery = query.Encode()
137 }
138 return u, nil
139}
140
141type job struct {
142 source EventSource
143 message []byte
144}
145
146func NewEventConsumer(cfg ConsumerConfig) *EventConsumer {
147 if cfg.RetryInterval == 0 {
148 cfg.RetryInterval = 15 * time.Minute
149 }
150 if cfg.ConnectionTimeout == 0 {
151 cfg.ConnectionTimeout = 10 * time.Second
152 }
153 if cfg.WorkerCount <= 0 {
154 cfg.WorkerCount = 5
155 }
156 if cfg.MaxRetryInterval == 0 {
157 cfg.MaxRetryInterval = 1 * time.Hour
158 }
159 if cfg.Logger == nil {
160 cfg.Logger = log.New("eventconsumer")
161 }
162 if cfg.QueueSize == 0 {
163 cfg.QueueSize = 100
164 }
165 if cfg.CursorStore == nil {
166 cfg.CursorStore = &MemoryCursorStore{}
167 }
168 return &EventConsumer{
169 cfg: cfg,
170 dialer: websocket.DefaultDialer,
171 jobQueue: make(chan job, cfg.QueueSize), // buffered job queue
172 logger: cfg.Logger,
173 randSource: rand.New(rand.NewSource(time.Now().UnixNano())),
174 }
175}
176
177func (c *EventConsumer) Start(ctx context.Context) {
178 c.cfg.Logger.Info("starting consumer", "config", c.cfg)
179
180 // start workers
181 for range c.cfg.WorkerCount {
182 c.wg.Add(1)
183 go c.worker(ctx)
184 }
185
186 // start streaming
187 for source := range c.cfg.Sources {
188 c.wg.Add(1)
189 go c.startConnectionLoop(ctx, source)
190 }
191}
192
193func (c *EventConsumer) Stop() {
194 c.connMap.Range(func(_, val any) bool {
195 if conn, ok := val.(*websocket.Conn); ok {
196 conn.Close()
197 }
198 return true
199 })
200 c.wg.Wait()
201 close(c.jobQueue)
202}
203
204func (c *EventConsumer) AddSource(ctx context.Context, s EventSource) {
205 c.mu.Lock()
206 c.cfg.Sources[s] = struct{}{}
207 c.wg.Add(1)
208 go c.startConnectionLoop(ctx, s)
209 c.mu.Unlock()
210}
211
212func (c *EventConsumer) worker(ctx context.Context) {
213 defer c.wg.Done()
214 for {
215 select {
216 case <-ctx.Done():
217 return
218 case j, ok := <-c.jobQueue:
219 if !ok {
220 return
221 }
222
223 var msg Message
224 err := json.Unmarshal(j.message, &msg)
225 if err != nil {
226 c.logger.Error("error deserializing message", "source", j.source.Knot, "err", err)
227 return
228 }
229
230 // update cursor
231 c.cfg.CursorStore.Set(j.source.Knot, time.Now().Unix())
232
233 if err := c.cfg.ProcessFunc(j.source, msg); err != nil {
234 c.logger.Error("error processing message", "source", j.source, "err", err)
235 }
236 }
237 }
238}
239
240func (c *EventConsumer) startConnectionLoop(ctx context.Context, source EventSource) {
241 defer c.wg.Done()
242 retryInterval := c.cfg.RetryInterval
243 for {
244 select {
245 case <-ctx.Done():
246 return
247 default:
248 err := c.runConnection(ctx, source)
249 if err != nil {
250 c.logger.Error("connection failed", "source", source, "err", err)
251 }
252
253 // apply jitter
254 jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5))
255 delay := retryInterval + jitter
256
257 if retryInterval < c.cfg.MaxRetryInterval {
258 retryInterval *= 2
259 if retryInterval > c.cfg.MaxRetryInterval {
260 retryInterval = c.cfg.MaxRetryInterval
261 }
262 }
263 c.logger.Info("retrying connection", "source", source, "delay", delay)
264 select {
265 case <-time.After(delay):
266 case <-ctx.Done():
267 return
268 }
269 }
270 }
271}
272
273func (c *EventConsumer) runConnection(ctx context.Context, source EventSource) error {
274 connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout)
275 defer cancel()
276
277 cursor := c.cfg.CursorStore.Get(source.Knot)
278
279 u, err := c.buildUrl(source, cursor)
280 if err != nil {
281 return err
282 }
283
284 c.logger.Info("connecting", "url", u.String())
285 conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil)
286 if err != nil {
287 return err
288 }
289 defer conn.Close()
290 c.connMap.Store(source, conn)
291 defer c.connMap.Delete(source)
292
293 c.logger.Info("connected", "source", source)
294
295 for {
296 select {
297 case <-ctx.Done():
298 return nil
299 default:
300 msgType, msg, err := conn.ReadMessage()
301 if err != nil {
302 return err
303 }
304 if msgType != websocket.TextMessage {
305 continue
306 }
307 select {
308 case c.jobQueue <- job{source: source, message: msg}:
309 case <-ctx.Done():
310 return nil
311 }
312 }
313 }
314}