1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/jetstream/pkg/client"
11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/sotangled/tangled/log"
14)
15
16type DB interface {
17 GetLastTimeUs() (int64, error)
18 SaveLastTimeUs(int64) error
19 UpdateLastTimeUs(int64) error
20}
21
22type JetstreamClient struct {
23 cfg *client.ClientConfig
24 client *client.Client
25 ident string
26 l *slog.Logger
27
28 db DB
29 waitForDid bool
30 mu sync.RWMutex
31
32 cancel context.CancelFunc
33 cancelMu sync.Mutex
34}
35
36func (j *JetstreamClient) AddDid(did string) {
37 if did == "" {
38 return
39 }
40 j.mu.Lock()
41 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
42 j.mu.Unlock()
43}
44
45func (j *JetstreamClient) UpdateDids(dids []string) {
46 j.mu.Lock()
47 for _, did := range dids {
48 if did != "" {
49 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
50 }
51 }
52 j.mu.Unlock()
53
54 j.cancelMu.Lock()
55 if j.cancel != nil {
56 j.cancel()
57 }
58 j.cancelMu.Unlock()
59}
60
61func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) {
62 if cfg == nil {
63 cfg = client.DefaultClientConfig()
64 cfg.WebsocketURL = endpoint
65 cfg.WantedCollections = collections
66 }
67
68 return &JetstreamClient{
69 cfg: cfg,
70 ident: ident,
71 db: db,
72 l: logger,
73
74 // This will make the goroutine in StartJetstream wait until
75 // cfg.WantedDids has been populated, typically using UpdateDids.
76 waitForDid: waitForDid,
77 }, nil
78}
79
80// StartJetstream starts the jetstream client and processes events using the provided processFunc.
81// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs).
82func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
83 logger := j.l
84
85 sched := sequential.NewScheduler(j.ident, logger, processFunc)
86
87 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
88 if err != nil {
89 return fmt.Errorf("failed to create jetstream client: %w", err)
90 }
91 j.client = client
92
93 go func() {
94 if j.waitForDid {
95 for len(j.cfg.WantedDids) == 0 {
96 time.Sleep(time.Second)
97 }
98 }
99 logger.Info("done waiting for did")
100 j.connectAndRead(ctx)
101 }()
102
103 return nil
104}
105
106func (j *JetstreamClient) connectAndRead(ctx context.Context) {
107 l := log.FromContext(ctx)
108 for {
109 cursor := j.getLastTimeUs(ctx)
110
111 connCtx, cancel := context.WithCancel(ctx)
112 j.cancelMu.Lock()
113 j.cancel = cancel
114 j.cancelMu.Unlock()
115
116 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
117 l.Error("error reading jetstream", "error", err)
118 cancel()
119 continue
120 }
121
122 select {
123 case <-ctx.Done():
124 l.Info("context done, stopping jetstream")
125 return
126 case <-connCtx.Done():
127 l.Info("connection context done, reconnecting")
128 continue
129 }
130 }
131}
132
133func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
134 l := log.FromContext(ctx)
135 lastTimeUs, err := j.db.GetLastTimeUs()
136 if err != nil {
137 l.Warn("couldn't get last time us, starting from now", "error", err)
138 lastTimeUs = time.Now().UnixMicro()
139 err = j.db.SaveLastTimeUs(lastTimeUs)
140 if err != nil {
141 l.Error("failed to save last time us", "error", err)
142 }
143 }
144
145 // If last time is older than a week, start from now
146 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
147 lastTimeUs = time.Now().UnixMicro()
148 l.Warn("last time us is older than 2 days; discarding that and starting from now")
149 err = j.db.UpdateLastTimeUs(lastTimeUs)
150 if err != nil {
151 l.Error("failed to save last time us", "error", err)
152 }
153 }
154
155 l.Info("found last time_us", "time_us", lastTimeUs)
156 return &lastTimeUs
157}