package main import ( "context" "fmt" "log/slog" "sync" "time" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" ) func Chunk[T any](slice []T, chunkSize int) [][]T { var chunks [][]T for i := 0; i < len(slice); i += chunkSize { end := min(i+chunkSize, len(slice)) chunks = append(chunks, slice[i:end]) } return chunks } type Stream struct { inner *client.Client cancel context.CancelFunc } type StreamManager struct { ctx context.Context logger *slog.Logger streamsLock sync.Mutex streams map[int]Stream name string handleEvent HandleEvent optsFn OptsFn } func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) *StreamManager { manager := &StreamManager{ ctx: context.TODO(), logger: logger.With("stream", name), streamsLock: sync.Mutex{}, streams: make(map[int]Stream), name: name, handleEvent: handleEvent, optsFn: optsFn, } go func() { for { time.Sleep(time.Minute * 2) manager.streamsLock.Lock() manager.logger.Info("sharding stats", "streamCount", len(manager.streams)) manager.streamsLock.Unlock() } }() return manager } // doesnt lock streams!!! func (manager *StreamManager) startSingle(id int, opts models.SubscriberOptionsUpdatePayload) { ctx, cancel := context.WithCancel(manager.ctx) stream := Stream{inner: nil, cancel: cancel} // add to streams and put on wait group manager.streams[id] = stream go startJetstreamLoop(ctx, manager.logger.With("streamId", id), &stream.inner, fmt.Sprintf("%s_%d", manager.name, id), manager.handleEvent, opts) } func (manager *StreamManager) chunkedOpts() ([]models.SubscriberOptionsUpdatePayload, int) { results := make([]models.SubscriberOptionsUpdatePayload, 0) opts := manager.optsFn() for _, wantedDidsChunk := range Chunk(opts.WantedDIDs, 9999) { results = append(results, models.SubscriberOptionsUpdatePayload{ WantedCollections: opts.WantedCollections, WantedDIDs: wantedDidsChunk, MaxMessageSizeBytes: opts.MaxMessageSizeBytes, }) } return results, len(opts.WantedDIDs) } func (manager *StreamManager) updateOpts() { chunks, userCount := manager.chunkedOpts() logger := manager.logger.With("userCount", userCount) manager.streamsLock.Lock() idsSeen := make(map[int]struct{}, len(manager.streams)) // update existing streams or create new ones for id, opts := range chunks { idsSeen[id] = struct{}{} if len(manager.streams) > id { stream := manager.streams[id] if stream.inner == nil { continue } if err := stream.inner.SendOptionsUpdate(opts); err != nil { logger.Error("couldnt update follow stream opts", "error", err, "streamId", id) } } else { manager.startSingle(id, opts) } } // cancel and delete unused streams for k := range manager.streams { if _, exists := idsSeen[k]; !exists { manager.streams[k].cancel() delete(manager.streams, k) } } manager.streamsLock.Unlock() logger.Info("updated opts") } type HandleEvent func(context.Context, *models.Event) error type OptsFn func() models.SubscriberOptionsUpdatePayload func startJetstreamLoop(ctx context.Context, logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, opts models.SubscriberOptionsUpdatePayload) { backoff := time.Second for { select { case <-ctx.Done(): return default: streamDone := make(chan struct{}) stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent) *outStream = stream if startFn != nil { logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs)) go func() { err = startFn() streamDone <- struct{}{} }() // HACK: we need to wait for the websocket connection to start here. so we do // need to upstream something to jetstream client time.Sleep(time.Second * 2) go func() { // HACK: also silly because it panics if the connection isnt established yet (why????????????) defer func() { panic := recover() if panic != nil { err = fmt.Errorf("%s", panic) } }() err = stream.SendOptionsUpdate(opts) }() if err == nil { <-streamDone } } if err != nil { logger.Error("stream failed", "error", err, "backoff", backoff) time.Sleep(backoff) backoff = backoff * 2 } else { backoff = time.Second } } } } func startJetstreamClient(ctx context.Context, logger *slog.Logger, name string, handleEvent HandleEvent) (*client.Client, func() error, error) { config := client.DefaultClientConfig() config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" config.Compress = true config.RequireHello = true scheduler := sequential.NewScheduler(name, logger, handleEvent) c, err := client.NewClient(config, logger, scheduler) if err != nil { logger.Error("failed to create jetstream client", "error", err) return nil, nil, err } startFn := func() error { if err := c.ConnectAndRead(ctx, nil); err != nil { logger.Error("jetstream client failed", "error", err) return err } return nil } return c, startFn, nil }