forked from tangled.org/core
this repo has no description
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "os" 8 "os/signal" 9 "sync" 10 "syscall" 11 "time" 12 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 "github.com/bluesky-social/jetstream/pkg/models" 16 "tangled.sh/tangled.sh/core/log" 17) 18 19type DB interface { 20 GetLastTimeUs() (int64, error) 21 SaveLastTimeUs(int64) error 22} 23 24type Set[T comparable] map[T]struct{} 25 26type JetstreamClient struct { 27 cfg *client.ClientConfig 28 client *client.Client 29 ident string 30 l *slog.Logger 31 32 logDids bool 33 wantedDids Set[string] 34 db DB 35 waitForDid bool 36 mu sync.RWMutex 37 38 cancel context.CancelFunc 39 cancelMu sync.Mutex 40} 41 42func (j *JetstreamClient) AddDid(did string) { 43 if did == "" { 44 return 45 } 46 47 if j.logDids { 48 j.l.Info("adding did to in-memory filter", "did", did) 49 } 50 j.mu.Lock() 51 j.wantedDids[did] = struct{}{} 52 j.mu.Unlock() 53} 54 55type processor func(context.Context, *models.Event) error 56 57func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 58 // empty filter => all dids allowed 59 if len(j.wantedDids) == 0 { 60 return processFunc 61 } 62 // since this closure references j.WantedDids; it should auto-update 63 // existing instances of the closure when j.WantedDids is mutated 64 return func(ctx context.Context, evt *models.Event) error { 65 if _, ok := j.wantedDids[evt.Did]; ok { 66 return processFunc(ctx, evt) 67 } else { 68 return nil 69 } 70 } 71} 72 73func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid, logDids bool) (*JetstreamClient, error) { 74 if cfg == nil { 75 cfg = client.DefaultClientConfig() 76 cfg.WebsocketURL = endpoint 77 cfg.WantedCollections = collections 78 } 79 80 return &JetstreamClient{ 81 cfg: cfg, 82 ident: ident, 83 db: db, 84 l: logger, 85 wantedDids: make(map[string]struct{}), 86 87 logDids: logDids, 88 89 // This will make the goroutine in StartJetstream wait until 90 // j.wantedDids has been populated, typically using addDids. 91 waitForDid: waitForDid, 92 }, nil 93} 94 95// StartJetstream starts the jetstream client and processes events using the provided processFunc. 96// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs). 97func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 98 logger := j.l 99 100 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc)) 101 102 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 103 if err != nil { 104 return fmt.Errorf("failed to create jetstream client: %w", err) 105 } 106 j.client = client 107 108 go func() { 109 if j.waitForDid { 110 for len(j.wantedDids) == 0 { 111 time.Sleep(time.Second) 112 } 113 } 114 logger.Info("done waiting for did") 115 116 go j.periodicLastTimeSave(ctx) 117 j.saveIfKilled(ctx) 118 119 j.connectAndRead(ctx) 120 }() 121 122 return nil 123} 124 125func (j *JetstreamClient) connectAndRead(ctx context.Context) { 126 l := log.FromContext(ctx) 127 for { 128 cursor := j.getLastTimeUs(ctx) 129 130 connCtx, cancel := context.WithCancel(ctx) 131 j.cancelMu.Lock() 132 j.cancel = cancel 133 j.cancelMu.Unlock() 134 135 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil { 136 l.Error("error reading jetstream", "error", err) 137 cancel() 138 continue 139 } 140 141 select { 142 case <-ctx.Done(): 143 l.Info("context done, stopping jetstream") 144 return 145 case <-connCtx.Done(): 146 l.Info("connection context done, reconnecting") 147 continue 148 } 149 } 150} 151 152// save cursor periodically 153func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) { 154 ticker := time.NewTicker(time.Minute) 155 defer ticker.Stop() 156 157 for { 158 select { 159 case <-ctx.Done(): 160 return 161 case <-ticker.C: 162 j.db.SaveLastTimeUs(time.Now().UnixMicro()) 163 } 164 } 165} 166 167func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 { 168 l := log.FromContext(ctx) 169 lastTimeUs, err := j.db.GetLastTimeUs() 170 if err != nil { 171 l.Warn("couldn't get last time us, starting from now", "error", err) 172 lastTimeUs = time.Now().UnixMicro() 173 err = j.db.SaveLastTimeUs(lastTimeUs) 174 if err != nil { 175 l.Error("failed to save last time us", "error", err) 176 } 177 } 178 179 // If last time is older than 2 days, start from now 180 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 181 lastTimeUs = time.Now().UnixMicro() 182 l.Warn("last time us is older than 2 days; discarding that and starting from now") 183 err = j.db.SaveLastTimeUs(lastTimeUs) 184 if err != nil { 185 l.Error("failed to save last time us", "error", err) 186 } 187 } 188 189 l.Info("found last time_us", "time_us", lastTimeUs) 190 return &lastTimeUs 191} 192 193func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context { 194 ctxWithCancel, cancel := context.WithCancel(ctx) 195 196 sigChan := make(chan os.Signal, 1) 197 198 signal.Notify(sigChan, 199 syscall.SIGINT, 200 syscall.SIGTERM, 201 syscall.SIGQUIT, 202 syscall.SIGHUP, 203 syscall.SIGKILL, 204 syscall.SIGSTOP, 205 ) 206 207 go func() { 208 sig := <-sigChan 209 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig) 210 211 lastTimeUs := time.Now().UnixMicro() 212 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil { 213 j.l.Error("Failed to save last time during shutdown", "error", err) 214 } 215 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs) 216 217 j.cancelMu.Lock() 218 if j.cancel != nil { 219 j.cancel() 220 } 221 j.cancelMu.Unlock() 222 223 cancel() 224 225 os.Exit(0) 226 }() 227 228 return ctxWithCancel 229}