forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "sync"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 "tangled.sh/tangled.sh/core/log"
17)
18
19type DB interface {
20 GetLastTimeUs() (int64, error)
21 SaveLastTimeUs(int64) error
22}
23
24type Set[T comparable] map[T]struct{}
25
26type JetstreamClient struct {
27 cfg *client.ClientConfig
28 client *client.Client
29 ident string
30 l *slog.Logger
31
32 logDids bool
33 wantedDids Set[string]
34 db DB
35 waitForDid bool
36 mu sync.RWMutex
37
38 cancel context.CancelFunc
39 cancelMu sync.Mutex
40}
41
42func (j *JetstreamClient) AddDid(did string) {
43 if did == "" {
44 return
45 }
46
47 if j.logDids {
48 j.l.Info("adding did to in-memory filter", "did", did)
49 }
50 j.mu.Lock()
51 j.wantedDids[did] = struct{}{}
52 j.mu.Unlock()
53}
54
55type processor func(context.Context, *models.Event) error
56
57func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
58 // empty filter => all dids allowed
59 if len(j.wantedDids) == 0 {
60 return processFunc
61 }
62 // since this closure references j.WantedDids; it should auto-update
63 // existing instances of the closure when j.WantedDids is mutated
64 return func(ctx context.Context, evt *models.Event) error {
65 if _, ok := j.wantedDids[evt.Did]; ok {
66 return processFunc(ctx, evt)
67 } else {
68 return nil
69 }
70 }
71}
72
73func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid, logDids bool) (*JetstreamClient, error) {
74 if cfg == nil {
75 cfg = client.DefaultClientConfig()
76 cfg.WebsocketURL = endpoint
77 cfg.WantedCollections = collections
78 }
79
80 return &JetstreamClient{
81 cfg: cfg,
82 ident: ident,
83 db: db,
84 l: logger,
85 wantedDids: make(map[string]struct{}),
86
87 logDids: logDids,
88
89 // This will make the goroutine in StartJetstream wait until
90 // j.wantedDids has been populated, typically using addDids.
91 waitForDid: waitForDid,
92 }, nil
93}
94
95// StartJetstream starts the jetstream client and processes events using the provided processFunc.
96// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs).
97func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
98 logger := j.l
99
100 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc))
101
102 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
103 if err != nil {
104 return fmt.Errorf("failed to create jetstream client: %w", err)
105 }
106 j.client = client
107
108 go func() {
109 if j.waitForDid {
110 for len(j.wantedDids) == 0 {
111 time.Sleep(time.Second)
112 }
113 }
114 logger.Info("done waiting for did")
115
116 go j.periodicLastTimeSave(ctx)
117 j.saveIfKilled(ctx)
118
119 j.connectAndRead(ctx)
120 }()
121
122 return nil
123}
124
125func (j *JetstreamClient) connectAndRead(ctx context.Context) {
126 l := log.FromContext(ctx)
127 for {
128 cursor := j.getLastTimeUs(ctx)
129
130 connCtx, cancel := context.WithCancel(ctx)
131 j.cancelMu.Lock()
132 j.cancel = cancel
133 j.cancelMu.Unlock()
134
135 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
136 l.Error("error reading jetstream", "error", err)
137 cancel()
138 continue
139 }
140
141 select {
142 case <-ctx.Done():
143 l.Info("context done, stopping jetstream")
144 return
145 case <-connCtx.Done():
146 l.Info("connection context done, reconnecting")
147 continue
148 }
149 }
150}
151
152// save cursor periodically
153func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) {
154 ticker := time.NewTicker(time.Minute)
155 defer ticker.Stop()
156
157 for {
158 select {
159 case <-ctx.Done():
160 return
161 case <-ticker.C:
162 j.db.SaveLastTimeUs(time.Now().UnixMicro())
163 }
164 }
165}
166
167func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
168 l := log.FromContext(ctx)
169 lastTimeUs, err := j.db.GetLastTimeUs()
170 if err != nil {
171 l.Warn("couldn't get last time us, starting from now", "error", err)
172 lastTimeUs = time.Now().UnixMicro()
173 err = j.db.SaveLastTimeUs(lastTimeUs)
174 if err != nil {
175 l.Error("failed to save last time us", "error", err)
176 }
177 }
178
179 // If last time is older than 2 days, start from now
180 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
181 lastTimeUs = time.Now().UnixMicro()
182 l.Warn("last time us is older than 2 days; discarding that and starting from now")
183 err = j.db.SaveLastTimeUs(lastTimeUs)
184 if err != nil {
185 l.Error("failed to save last time us", "error", err)
186 }
187 }
188
189 l.Info("found last time_us", "time_us", lastTimeUs)
190 return &lastTimeUs
191}
192
193func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context {
194 ctxWithCancel, cancel := context.WithCancel(ctx)
195
196 sigChan := make(chan os.Signal, 1)
197
198 signal.Notify(sigChan,
199 syscall.SIGINT,
200 syscall.SIGTERM,
201 syscall.SIGQUIT,
202 syscall.SIGHUP,
203 syscall.SIGKILL,
204 syscall.SIGSTOP,
205 )
206
207 go func() {
208 sig := <-sigChan
209 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig)
210
211 lastTimeUs := time.Now().UnixMicro()
212 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil {
213 j.l.Error("Failed to save last time during shutdown", "error", err)
214 }
215 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs)
216
217 j.cancelMu.Lock()
218 if j.cancel != nil {
219 j.cancel()
220 }
221 j.cancelMu.Unlock()
222
223 cancel()
224
225 os.Exit(0)
226 }()
227
228 return ctxWithCancel
229}