its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "log/slog" 6 7 "github.com/bluesky-social/jetstream/pkg/client" 8 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 9 "github.com/bluesky-social/jetstream/pkg/models" 10) 11 12type HandleEvent func(context.Context, *models.Event) error 13 14func startJetstreamLoop(logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, optsFn func() models.SubscriberOptionsUpdatePayload) { 15 for { 16 stream, startFn, err := startJetstreamClient(name, optsFn(), handleEvent) 17 *outStream = stream 18 if startFn != nil { 19 err = startFn() 20 } 21 if err != nil { 22 logger.Error("stream failed", "name", name, "error", err) 23 } 24 } 25} 26 27func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent HandleEvent) (*client.Client, func() error, error) { 28 ctx := context.Background() 29 30 config := client.DefaultClientConfig() 31 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 32 config.Compress = true 33 config.WantedCollections = opts.WantedCollections 34 config.WantedDids = opts.WantedDIDs 35 config.RequireHello = false 36 37 scheduler := sequential.NewScheduler(name, logger, handleEvent) 38 39 c, err := client.NewClient(config, logger, scheduler) 40 if err != nil { 41 logger.Error("failed to create jetstream client", "name", name, "error", err) 42 return nil, nil, err 43 } 44 45 startFn := func() error { 46 logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs)) 47 if err := c.ConnectAndRead(ctx, nil); err != nil { 48 logger.Error("jetstream client failed", "name", name, "error", err) 49 return err 50 } 51 52 return nil 53 } 54 55 return c, startFn, nil 56}