its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "log/slog"
6
7 "github.com/bluesky-social/jetstream/pkg/client"
8 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
9 "github.com/bluesky-social/jetstream/pkg/models"
10)
11
12type HandleEvent func(context.Context, *models.Event) error
13
14func startJetstreamLoop(logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, optsFn func() models.SubscriberOptionsUpdatePayload) {
15 for {
16 stream, startFn, err := startJetstreamClient(name, optsFn(), handleEvent)
17 *outStream = stream
18 if startFn != nil {
19 err = startFn()
20 }
21 if err != nil {
22 logger.Error("stream failed", "name", name, "error", err)
23 }
24 }
25}
26
27func startJetstreamClient(name string, opts models.SubscriberOptionsUpdatePayload, handleEvent HandleEvent) (*client.Client, func() error, error) {
28 ctx := context.Background()
29
30 config := client.DefaultClientConfig()
31 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
32 config.Compress = true
33 config.WantedCollections = opts.WantedCollections
34 config.WantedDids = opts.WantedDIDs
35 config.RequireHello = false
36
37 scheduler := sequential.NewScheduler(name, logger, handleEvent)
38
39 c, err := client.NewClient(config, logger, scheduler)
40 if err != nil {
41 logger.Error("failed to create jetstream client", "name", name, "error", err)
42 return nil, nil, err
43 }
44
45 startFn := func() error {
46 logger.Info("starting jetstream client", "name", name, "collections", opts.WantedCollections, "wanted_dids", len(opts.WantedDIDs))
47 if err := c.ConnectAndRead(ctx, nil); err != nil {
48 logger.Error("jetstream client failed", "name", name, "error", err)
49 return err
50 }
51
52 return nil
53 }
54
55 return c, startFn, nil
56}