1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/bluesky-social/jetstream/pkg/client"
10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/sotangled/tangled/log"
13)
14
15type DB interface {
16 GetLastTimeUs() (int64, error)
17 SaveLastTimeUs(int64) error
18}
19
20type JetstreamClient struct {
21 cfg *client.ClientConfig
22 client *client.Client
23 ident string
24
25 db DB
26 reconnectCh chan struct{}
27 waitForDid bool
28 mu sync.RWMutex
29}
30
31func (j *JetstreamClient) AddDid(did string) {
32 j.mu.Lock()
33 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
34 j.mu.Unlock()
35 j.reconnectCh <- struct{}{}
36}
37
38func (j *JetstreamClient) UpdateDids(dids []string) {
39 j.mu.Lock()
40 j.cfg.WantedDids = dids
41 j.mu.Unlock()
42 j.reconnectCh <- struct{}{}
43}
44
45func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) {
46 if cfg == nil {
47 cfg = client.DefaultClientConfig()
48 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
49 cfg.WantedCollections = collections
50 }
51
52 return &JetstreamClient{
53 cfg: cfg,
54 ident: ident,
55 db: db,
56
57 // This will make the goroutine in StartJetstream wait until
58 // cfg.WantedDids has been populated, typically using UpdateDids.
59 waitForDid: waitForDid,
60 reconnectCh: make(chan struct{}, 1),
61 }, nil
62}
63
64func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
65 logger := log.FromContext(ctx)
66
67 pf := func(ctx context.Context, e *models.Event) error {
68 err := processFunc(ctx, e)
69 if err != nil {
70 return err
71 }
72
73 if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil {
74 return err
75 }
76
77 return nil
78 }
79
80 sched := sequential.NewScheduler(j.ident, logger, pf)
81
82 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched)
83 if err != nil {
84 return fmt.Errorf("failed to create jetstream client: %w", err)
85 }
86 j.client = client
87
88 go func() {
89 lastTimeUs := j.getLastTimeUs(ctx)
90 if j.waitForDid {
91 for len(j.cfg.WantedDids) == 0 {
92 time.Sleep(time.Second)
93 }
94 }
95 logger.Info("done waiting for did")
96 j.connectAndRead(ctx, &lastTimeUs)
97 }()
98
99 return nil
100}
101
102func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) {
103 l := log.FromContext(ctx)
104 for {
105 select {
106 case <-j.reconnectCh:
107 l.Info("(re)connecting jetstream client")
108 j.client.Scheduler.Shutdown()
109 if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
110 l.Error("error reading jetstream", "error", err)
111 }
112 default:
113 if err := j.client.ConnectAndRead(ctx, cursor); err != nil {
114 l.Error("error reading jetstream", "error", err)
115 }
116 }
117 }
118}
119
120func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 {
121 l := log.FromContext(ctx)
122 lastTimeUs, err := j.db.GetLastTimeUs()
123 if err != nil {
124 l.Warn("couldn't get last time us, starting from now", "error", err)
125 lastTimeUs = time.Now().UnixMicro()
126 err = j.db.SaveLastTimeUs(lastTimeUs)
127 if err != nil {
128 l.Error("failed to save last time us")
129 }
130 }
131
132 // If last time is older than a week, start from now
133 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
134 lastTimeUs = time.Now().UnixMicro()
135 l.Warn("last time us is older than a week. discarding that and starting from now")
136 err = j.db.SaveLastTimeUs(lastTimeUs)
137 if err != nil {
138 l.Error("failed to save last time us")
139 }
140 }
141
142 l.Info("found last time_us", "time_us", lastTimeUs)
143 return lastTimeUs
144}