forked from tangled.org/core
this repo has no description
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/jetstream/pkg/client" 11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 12 "github.com/bluesky-social/jetstream/pkg/models" 13 "tangled.sh/tangled.sh/core/log" 14) 15 16type DB interface { 17 GetLastTimeUs() (int64, error) 18 SaveLastTimeUs(int64) error 19 UpdateLastTimeUs(int64) error 20} 21 22type JetstreamClient struct { 23 cfg *client.ClientConfig 24 client *client.Client 25 ident string 26 l *slog.Logger 27 28 db DB 29 waitForDid bool 30 mu sync.RWMutex 31 32 cancel context.CancelFunc 33 cancelMu sync.Mutex 34} 35 36func (j *JetstreamClient) AddDid(did string) { 37 if did == "" { 38 return 39 } 40 j.mu.Lock() 41 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 42 j.mu.Unlock() 43} 44 45func (j *JetstreamClient) UpdateDids(dids []string) { 46 j.mu.Lock() 47 for _, did := range dids { 48 if did != "" { 49 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 50 } 51 } 52 j.mu.Unlock() 53 54 j.cancelMu.Lock() 55 if j.cancel != nil { 56 j.cancel() 57 } 58 j.cancelMu.Unlock() 59} 60 61func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid bool) (*JetstreamClient, error) { 62 if cfg == nil { 63 cfg = client.DefaultClientConfig() 64 cfg.WebsocketURL = endpoint 65 cfg.WantedCollections = collections 66 } 67 68 return &JetstreamClient{ 69 cfg: cfg, 70 ident: ident, 71 db: db, 72 l: logger, 73 74 // This will make the goroutine in StartJetstream wait until 75 // cfg.WantedDids has been populated, typically using UpdateDids. 76 waitForDid: waitForDid, 77 }, nil 78} 79 80// StartJetstream starts the jetstream client and processes events using the provided processFunc. 81// The caller is responsible for saving the last time_us to the database (just use your db.SaveLastTimeUs). 82func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 83 logger := j.l 84 85 sched := sequential.NewScheduler(j.ident, logger, processFunc) 86 87 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 88 if err != nil { 89 return fmt.Errorf("failed to create jetstream client: %w", err) 90 } 91 j.client = client 92 93 go func() { 94 if j.waitForDid { 95 for len(j.cfg.WantedDids) == 0 { 96 time.Sleep(time.Second) 97 } 98 } 99 logger.Info("done waiting for did") 100 j.connectAndRead(ctx) 101 }() 102 103 return nil 104} 105 106func (j *JetstreamClient) connectAndRead(ctx context.Context) { 107 l := log.FromContext(ctx) 108 for { 109 cursor := j.getLastTimeUs(ctx) 110 111 connCtx, cancel := context.WithCancel(ctx) 112 j.cancelMu.Lock() 113 j.cancel = cancel 114 j.cancelMu.Unlock() 115 116 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil { 117 l.Error("error reading jetstream", "error", err) 118 cancel() 119 continue 120 } 121 122 select { 123 case <-ctx.Done(): 124 l.Info("context done, stopping jetstream") 125 return 126 case <-connCtx.Done(): 127 l.Info("connection context done, reconnecting") 128 continue 129 } 130 } 131} 132 133func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 { 134 l := log.FromContext(ctx) 135 lastTimeUs, err := j.db.GetLastTimeUs() 136 if err != nil { 137 l.Warn("couldn't get last time us, starting from now", "error", err) 138 lastTimeUs = time.Now().UnixMicro() 139 err = j.db.SaveLastTimeUs(lastTimeUs) 140 if err != nil { 141 l.Error("failed to save last time us", "error", err) 142 } 143 } 144 145 // If last time is older than a week, start from now 146 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 147 lastTimeUs = time.Now().UnixMicro() 148 l.Warn("last time us is older than 2 days; discarding that and starting from now") 149 err = j.db.UpdateLastTimeUs(lastTimeUs) 150 if err != nil { 151 l.Error("failed to save last time us", "error", err) 152 } 153 } 154 155 l.Info("found last time_us", "time_us", lastTimeUs) 156 return &lastTimeUs 157}