its for when you want to get like notifications for your reposts
1package main 2 3import ( 4 "context" 5 "log/slog" 6 "time" 7 8 "github.com/bluesky-social/jetstream/pkg/client" 9 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 10 "github.com/bluesky-social/jetstream/pkg/models" 11) 12 13type HandleEvent func(context.Context, *models.Event) error 14 15func startJetstreamLoop(logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, optsFn func() models.SubscriberOptionsUpdatePayload) { 16 for { 17 stream, startFn, err := startJetstreamClient(name, optsFn(), handleEvent) 18 *outStream = stream 19 if startFn != nil { 20 err = startFn() 21 } 22 if err != nil { 23 logger.Error("stream failed", "name", name, "error", err) 24 } 25 } 26} 27 28func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent HandleEvent) (*client.Client, func() error, error) { 29 ctx := context.Background() 30 31 config := client.DefaultClientConfig() 32 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 33 config.Compress = true 34 config.WantedCollections = opts.WantedCollections 35 config.WantedDids = opts.WantedDIDs 36 config.RequireHello = len(config.WantedDids) == 0 37 38 scheduler := sequential.NewScheduler(name, logger, handleEvent) 39 40 c, err := client.NewClient(config, logger, scheduler) 41 if err != nil { 42 logger.Error("failed to create jetstream client", "name", name, "error", err) 43 return nil, nil, err 44 } 45 46 startFn := func() error { 47 cursor := time.Now().UnixMicro() 48 49 logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs)) 50 if err := c.ConnectAndRead(ctx, &cursor); err != nil { 51 logger.Error("jetstream client failed", "name", name, "error", err) 52 return err 53 } 54 55 return nil 56 } 57 58 return c, startFn, nil 59}