its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/jetstream/pkg/client" 11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 12 "github.com/bluesky-social/jetstream/pkg/models" 13) 14 15func Chunk[T any](slice []T, chunkSize int) [][]T { 16 var chunks [][]T 17 for i := 0; i < len(slice); i += chunkSize { 18 end := min(i+chunkSize, len(slice)) 19 chunks = append(chunks, slice[i:end]) 20 } 21 return chunks 22} 23 24type Stream struct { 25 inner *client.Client 26 cancel context.CancelFunc 27} 28type StreamManager struct { 29 ctx context.Context 30 logger *slog.Logger 31 streamsLock sync.Mutex 32 streams map[int]Stream 33 name string 34 handleEvent HandleEvent 35 optsFn OptsFn 36} 37 38func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) StreamManager { 39 return StreamManager{ 40 ctx: context.TODO(), 41 logger: logger.With("stream", name), 42 streamsLock: sync.Mutex{}, 43 streams: make(map[int]Stream), 44 name: name, 45 handleEvent: handleEvent, 46 optsFn: optsFn, 47 } 48} 49 50// doesnt lock streams!!! 51func (manager *StreamManager) startSingle(id int, opts models.SubscriberOptionsUpdatePayload) { 52 ctx, cancel := context.WithCancel(manager.ctx) 53 stream := Stream{inner: nil, cancel: cancel} 54 // add to streams and put on wait group 55 manager.streams[id] = stream 56 go startJetstreamLoop(ctx, manager.logger.With("streamId", id), &stream.inner, fmt.Sprintf("%s_%d", manager.name, id), manager.handleEvent, opts) 57} 58 59func (manager *StreamManager) chunkedOpts() ([]models.SubscriberOptionsUpdatePayload, int) { 60 results := make([]models.SubscriberOptionsUpdatePayload, 0) 61 opts := manager.optsFn() 62 for _, wantedDidsChunk := range Chunk(opts.WantedDIDs, 9999) { 63 results = append(results, models.SubscriberOptionsUpdatePayload{ 64 WantedCollections: opts.WantedCollections, 65 WantedDIDs: wantedDidsChunk, 66 MaxMessageSizeBytes: opts.MaxMessageSizeBytes, 67 }) 68 } 69 return results, len(opts.WantedDIDs) 70} 71 72func (manager *StreamManager) updateOpts() { 73 chunks, userCount := manager.chunkedOpts() 74 manager.streamsLock.Lock() 75 idsSeen := make(map[int]struct{}, 0) 76 // update existing streams or create new ones 77 for id, opts := range chunks { 78 idsSeen[id] = struct{}{} 79 if len(manager.streams) > id { 80 stream := manager.streams[id] 81 if stream.inner == nil { 82 continue 83 } 84 if err := stream.inner.SendOptionsUpdate(opts); err != nil { 85 manager.logger.Error("couldnt update follow stream opts", "error", err, "streamId", id) 86 } 87 } else { 88 manager.startSingle(id, opts) 89 } 90 } 91 // cancel and delete unused streams 92 for k := range manager.streams { 93 if _, exists := idsSeen[k]; !exists { 94 manager.streams[k].cancel() 95 delete(manager.streams, k) 96 } 97 } 98 manager.streamsLock.Unlock() 99 manager.logger.Info("updated opts", "userCount", userCount) 100} 101 102type HandleEvent func(context.Context, *models.Event) error 103type OptsFn func() models.SubscriberOptionsUpdatePayload 104 105func startJetstreamLoop(ctx context.Context, logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, opts models.SubscriberOptionsUpdatePayload) { 106 backoff := time.Second 107 for { 108 done := make(chan struct{}) 109 if ctx.Err() != nil { 110 break 111 } 112 stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent) 113 *outStream = stream 114 if startFn != nil { 115 logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs)) 116 go func() { 117 err = startFn() 118 done <- struct{}{} 119 }() 120 // HACK: we need to wait for the websocket connection to start here. so we do 121 // need to upstream something to jetstream client 122 time.Sleep(time.Second * 2) 123 err = stream.SendOptionsUpdate(opts) 124 if err == nil { 125 <-done 126 } 127 } 128 if err != nil { 129 logger.Error("stream failed", "error", err, "backoff", backoff) 130 time.Sleep(backoff) 131 backoff = backoff * 2 132 } else { 133 backoff = time.Second 134 } 135 } 136} 137 138func startJetstreamClient(ctx context.Context, logger *slog.Logger, name string, handleEvent HandleEvent) (*client.Client, func() error, error) { 139 config := client.DefaultClientConfig() 140 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 141 config.Compress = true 142 config.RequireHello = true 143 144 scheduler := sequential.NewScheduler(name, logger, handleEvent) 145 146 c, err := client.NewClient(config, logger, scheduler) 147 if err != nil { 148 logger.Error("failed to create jetstream client", "error", err) 149 return nil, nil, err 150 } 151 152 startFn := func() error { 153 if err := c.ConnectAndRead(ctx, nil); err != nil { 154 logger.Error("jetstream client failed", "error", err) 155 return err 156 } 157 158 return nil 159 } 160 161 return c, startFn, nil 162}