its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/jetstream/pkg/client"
11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12 "github.com/bluesky-social/jetstream/pkg/models"
13)
14
15func Chunk[T any](slice []T, chunkSize int) [][]T {
16 var chunks [][]T
17 for i := 0; i < len(slice); i += chunkSize {
18 end := min(i+chunkSize, len(slice))
19 chunks = append(chunks, slice[i:end])
20 }
21 return chunks
22}
23
24type Stream struct {
25 inner *client.Client
26 cancel context.CancelFunc
27}
28type StreamManager struct {
29 ctx context.Context
30 logger *slog.Logger
31 streamsLock sync.Mutex
32 streams map[int]Stream
33 name string
34 handleEvent HandleEvent
35 optsFn OptsFn
36}
37
38func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) StreamManager {
39 return StreamManager{
40 ctx: context.TODO(),
41 logger: logger.With("stream", name),
42 streamsLock: sync.Mutex{},
43 streams: make(map[int]Stream),
44 name: name,
45 handleEvent: handleEvent,
46 optsFn: optsFn,
47 }
48}
49
50// doesnt lock streams!!!
51func (manager *StreamManager) startSingle(id int, opts models.SubscriberOptionsUpdatePayload) {
52 ctx, cancel := context.WithCancel(manager.ctx)
53 stream := Stream{inner: nil, cancel: cancel}
54 // add to streams and put on wait group
55 manager.streams[id] = stream
56 go startJetstreamLoop(ctx, manager.logger.With("streamId", id), &stream.inner, fmt.Sprintf("%s_%d", manager.name, id), manager.handleEvent, opts)
57}
58
59func (manager *StreamManager) chunkedOpts() ([]models.SubscriberOptionsUpdatePayload, int) {
60 results := make([]models.SubscriberOptionsUpdatePayload, 0)
61 opts := manager.optsFn()
62 for _, wantedDidsChunk := range Chunk(opts.WantedDIDs, 9999) {
63 results = append(results, models.SubscriberOptionsUpdatePayload{
64 WantedCollections: opts.WantedCollections,
65 WantedDIDs: wantedDidsChunk,
66 MaxMessageSizeBytes: opts.MaxMessageSizeBytes,
67 })
68 }
69 return results, len(opts.WantedDIDs)
70}
71
72func (manager *StreamManager) updateOpts() {
73 chunks, userCount := manager.chunkedOpts()
74 manager.streamsLock.Lock()
75 idsSeen := make(map[int]struct{}, 0)
76 // update existing streams or create new ones
77 for id, opts := range chunks {
78 idsSeen[id] = struct{}{}
79 if len(manager.streams) > id {
80 stream := manager.streams[id]
81 if stream.inner == nil {
82 continue
83 }
84 if err := stream.inner.SendOptionsUpdate(opts); err != nil {
85 manager.logger.Error("couldnt update follow stream opts", "error", err, "streamId", id)
86 }
87 } else {
88 manager.startSingle(id, opts)
89 }
90 }
91 // cancel and delete unused streams
92 for k := range manager.streams {
93 if _, exists := idsSeen[k]; !exists {
94 manager.streams[k].cancel()
95 delete(manager.streams, k)
96 }
97 }
98 manager.streamsLock.Unlock()
99 manager.logger.Info("updated opts", "userCount", userCount)
100}
101
102type HandleEvent func(context.Context, *models.Event) error
103type OptsFn func() models.SubscriberOptionsUpdatePayload
104
105func startJetstreamLoop(ctx context.Context, logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, opts models.SubscriberOptionsUpdatePayload) {
106 backoff := time.Second
107 for {
108 done := make(chan struct{})
109 if ctx.Err() != nil {
110 break
111 }
112 stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent)
113 *outStream = stream
114 if startFn != nil {
115 logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs))
116 go func() {
117 err = startFn()
118 done <- struct{}{}
119 }()
120 // HACK: we need to wait for the websocket connection to start here. so we do
121 // need to upstream something to jetstream client
122 time.Sleep(time.Second * 2)
123 err = stream.SendOptionsUpdate(opts)
124 if err == nil {
125 <-done
126 }
127 }
128 if err != nil {
129 logger.Error("stream failed", "error", err, "backoff", backoff)
130 time.Sleep(backoff)
131 backoff = backoff * 2
132 } else {
133 backoff = time.Second
134 }
135 }
136}
137
138func startJetstreamClient(ctx context.Context, logger *slog.Logger, name string, handleEvent HandleEvent) (*client.Client, func() error, error) {
139 config := client.DefaultClientConfig()
140 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
141 config.Compress = true
142 config.RequireHello = true
143
144 scheduler := sequential.NewScheduler(name, logger, handleEvent)
145
146 c, err := client.NewClient(config, logger, scheduler)
147 if err != nil {
148 logger.Error("failed to create jetstream client", "error", err)
149 return nil, nil, err
150 }
151
152 startFn := func() error {
153 if err := c.ConnectAndRead(ctx, nil); err != nil {
154 logger.Error("jetstream client failed", "error", err)
155 return err
156 }
157
158 return nil
159 }
160
161 return c, startFn, nil
162}