forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "time" 8 9 "github.com/bluesky-social/jetstream/pkg/client" 10 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/sotangled/tangled/log" 13) 14 15type DB interface { 16 GetLastTimeUs() (int64, error) 17 SaveLastTimeUs(int64) error 18} 19 20type JetstreamClient struct { 21 cfg *client.ClientConfig 22 client *client.Client 23 ident string 24 25 db DB 26 reconnectCh chan struct{} 27 waitForDid bool 28 mu sync.RWMutex 29} 30 31func (j *JetstreamClient) AddDid(did string) { 32 j.mu.Lock() 33 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 34 j.mu.Unlock() 35 j.reconnectCh <- struct{}{} 36} 37 38func (j *JetstreamClient) UpdateDids(dids []string) { 39 j.mu.Lock() 40 j.cfg.WantedDids = dids 41 j.mu.Unlock() 42 j.reconnectCh <- struct{}{} 43} 44 45func NewJetstreamClient(ident string, collections []string, cfg *client.ClientConfig, db DB, waitForDid bool) (*JetstreamClient, error) { 46 if cfg == nil { 47 cfg = client.DefaultClientConfig() 48 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 49 cfg.WantedCollections = collections 50 } 51 52 return &JetstreamClient{ 53 cfg: cfg, 54 ident: ident, 55 db: db, 56 57 // This will make the goroutine in StartJetstream wait until 58 // cfg.WantedDids has been populated, typically using UpdateDids. 59 waitForDid: waitForDid, 60 reconnectCh: make(chan struct{}, 1), 61 }, nil 62} 63 64func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 65 logger := log.FromContext(ctx) 66 67 pf := func(ctx context.Context, e *models.Event) error { 68 err := processFunc(ctx, e) 69 if err != nil { 70 return err 71 } 72 73 if err := j.db.SaveLastTimeUs(e.TimeUS); err != nil { 74 return err 75 } 76 77 return nil 78 } 79 80 sched := sequential.NewScheduler(j.ident, logger, pf) 81 82 client, err := client.NewClient(j.cfg, log.New("jetstream"), sched) 83 if err != nil { 84 return fmt.Errorf("failed to create jetstream client: %w", err) 85 } 86 j.client = client 87 88 go func() { 89 lastTimeUs := j.getLastTimeUs(ctx) 90 if j.waitForDid { 91 for len(j.cfg.WantedDids) == 0 { 92 time.Sleep(time.Second) 93 } 94 } 95 logger.Info("done waiting for did") 96 j.connectAndRead(ctx, &lastTimeUs) 97 }() 98 99 return nil 100} 101 102func (j *JetstreamClient) connectAndRead(ctx context.Context, cursor *int64) { 103 l := log.FromContext(ctx) 104 for { 105 select { 106 case <-j.reconnectCh: 107 l.Info("(re)connecting jetstream client") 108 j.client.Scheduler.Shutdown() 109 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 110 l.Error("error reading jetstream", "error", err) 111 } 112 default: 113 if err := j.client.ConnectAndRead(ctx, cursor); err != nil { 114 l.Error("error reading jetstream", "error", err) 115 } 116 } 117 } 118} 119 120func (j *JetstreamClient) getLastTimeUs(ctx context.Context) int64 { 121 l := log.FromContext(ctx) 122 lastTimeUs, err := j.db.GetLastTimeUs() 123 if err != nil { 124 l.Warn("couldn't get last time us, starting from now", "error", err) 125 lastTimeUs = time.Now().UnixMicro() 126 err = j.db.SaveLastTimeUs(lastTimeUs) 127 if err != nil { 128 l.Error("failed to save last time us") 129 } 130 } 131 132 // If last time is older than a week, start from now 133 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 134 lastTimeUs = time.Now().UnixMicro() 135 l.Warn("last time us is older than a week. discarding that and starting from now") 136 err = j.db.SaveLastTimeUs(lastTimeUs) 137 if err != nil { 138 l.Error("failed to save last time us") 139 } 140 } 141 142 l.Info("found last time_us", "time_us", lastTimeUs) 143 return lastTimeUs 144}