its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "log/slog"
6 "time"
7
8 "github.com/bluesky-social/jetstream/pkg/client"
9 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
10 "github.com/bluesky-social/jetstream/pkg/models"
11)
12
13type HandleEvent func(context.Context, *models.Event) error
14
15func startJetstreamLoop(logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, optsFn func() models.SubscriberOptionsUpdatePayload) {
16 for {
17 stream, startFn, err := startJetstreamClient(name, optsFn(), handleEvent)
18 *outStream = stream
19 if startFn != nil {
20 err = startFn()
21 }
22 if err != nil {
23 logger.Error("stream failed", "name", name, "error", err)
24 }
25 }
26}
27
28func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent HandleEvent) (*client.Client, func() error, error) {
29 ctx := context.Background()
30
31 config := client.DefaultClientConfig()
32 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
33 config.Compress = true
34 config.WantedCollections = opts.WantedCollections
35 config.WantedDids = opts.WantedDIDs
36 config.RequireHello = len(config.WantedDids) == 0
37
38 scheduler := sequential.NewScheduler(name, logger, handleEvent)
39
40 c, err := client.NewClient(config, logger, scheduler)
41 if err != nil {
42 logger.Error("failed to create jetstream client", "name", name, "error", err)
43 return nil, nil, err
44 }
45
46 startFn := func() error {
47 cursor := time.Now().UnixMicro()
48
49 logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs))
50 if err := c.ConnectAndRead(ctx, &cursor); err != nil {
51 logger.Error("jetstream client failed", "name", name, "error", err)
52 return err
53 }
54
55 return nil
56 }
57
58 return c, startFn, nil
59}