package main import ( "context" "log/slog" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" ) type HandleEvent func(context.Context, *models.Event) error func startJetstreamLoop(logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, optsFn func() models.SubscriberOptionsUpdatePayload) { for { stream, startFn, err := startJetstreamClient(name, optsFn(), handleEvent) *outStream = stream if startFn != nil { err = startFn() } if err != nil { logger.Error("stream failed", "name", name, "error", err) } } } func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent HandleEvent) (*client.Client, func() error, error) { ctx := context.Background() config := client.DefaultClientConfig() config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" config.Compress = true config.WantedCollections = opts.WantedCollections config.WantedDids = opts.WantedDIDs config.RequireHello = false scheduler := sequential.NewScheduler(name, logger, handleEvent) c, err := client.NewClient(config, logger, scheduler) if err != nil { logger.Error("failed to create jetstream client", "name", name, "error", err) return nil, nil, err } startFn := func() error { logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs)) if err := c.ConnectAndRead(ctx, nil); err != nil { logger.Error("jetstream client failed", "name", name, "error", err) return err } return nil } return c, startFn, nil }