its for when you want to get like notifications for your reposts
1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/jetstream/pkg/client"
11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12 "github.com/bluesky-social/jetstream/pkg/models"
13)
14
15func Chunk[T any](slice []T, chunkSize int) [][]T {
16 var chunks [][]T
17 for i := 0; i < len(slice); i += chunkSize {
18 end := min(i+chunkSize, len(slice))
19 chunks = append(chunks, slice[i:end])
20 }
21 return chunks
22}
23
24type Stream struct {
25 inner *client.Client
26 cancel context.CancelFunc
27}
28type StreamManager struct {
29 ctx context.Context
30 logger *slog.Logger
31 streamsLock sync.Mutex
32 streams map[int]Stream
33 name string
34 handleEvent HandleEvent
35 optsFn OptsFn
36}
37
38func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) *StreamManager {
39 manager := &StreamManager{
40 ctx: context.TODO(),
41 logger: logger.With("stream", name),
42 streamsLock: sync.Mutex{},
43 streams: make(map[int]Stream),
44 name: name,
45 handleEvent: handleEvent,
46 optsFn: optsFn,
47 }
48 go func() {
49 for {
50 time.Sleep(time.Minute * 2)
51 manager.streamsLock.Lock()
52 manager.logger.Info("sharding stats", "streamCount", len(manager.streams))
53 manager.streamsLock.Unlock()
54 }
55 }()
56 return manager
57}
58
59// doesnt lock streams!!!
60func (manager *StreamManager) startSingle(id int, opts models.SubscriberOptionsUpdatePayload) {
61 ctx, cancel := context.WithCancel(manager.ctx)
62 stream := Stream{inner: nil, cancel: cancel}
63 // add to streams and put on wait group
64 manager.streams[id] = stream
65 go startJetstreamLoop(ctx, manager.logger.With("streamId", id), &stream.inner, fmt.Sprintf("%s_%d", manager.name, id), manager.handleEvent, opts)
66}
67
68func (manager *StreamManager) chunkedOpts() ([]models.SubscriberOptionsUpdatePayload, int) {
69 results := make([]models.SubscriberOptionsUpdatePayload, 0)
70 opts := manager.optsFn()
71 for _, wantedDidsChunk := range Chunk(opts.WantedDIDs, 9999) {
72 results = append(results, models.SubscriberOptionsUpdatePayload{
73 WantedCollections: opts.WantedCollections,
74 WantedDIDs: wantedDidsChunk,
75 MaxMessageSizeBytes: opts.MaxMessageSizeBytes,
76 })
77 }
78 return results, len(opts.WantedDIDs)
79}
80
81func (manager *StreamManager) updateOpts() {
82 chunks, userCount := manager.chunkedOpts()
83 logger := manager.logger.With("userCount", userCount)
84 manager.streamsLock.Lock()
85 idsSeen := make(map[int]struct{}, len(manager.streams))
86 // update existing streams or create new ones
87 for id, opts := range chunks {
88 idsSeen[id] = struct{}{}
89 if len(manager.streams) > id {
90 stream := manager.streams[id]
91 if stream.inner == nil {
92 continue
93 }
94 if err := stream.inner.SendOptionsUpdate(opts); err != nil {
95 logger.Error("couldnt update follow stream opts", "error", err, "streamId", id)
96 }
97 } else {
98 manager.startSingle(id, opts)
99 }
100 }
101 // cancel and delete unused streams
102 for k := range manager.streams {
103 if _, exists := idsSeen[k]; !exists {
104 manager.streams[k].cancel()
105 delete(manager.streams, k)
106 }
107 }
108 manager.streamsLock.Unlock()
109 logger.Info("updated opts")
110}
111
112type HandleEvent func(context.Context, *models.Event) error
113type OptsFn func() models.SubscriberOptionsUpdatePayload
114
115func startJetstreamLoop(ctx context.Context, logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, opts models.SubscriberOptionsUpdatePayload) {
116 backoff := time.Second
117 for {
118 select {
119 case <-ctx.Done():
120 return
121 default:
122 streamDone := make(chan struct{})
123 stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent)
124 *outStream = stream
125 if startFn != nil {
126 logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs))
127 go func() {
128 err = startFn()
129 streamDone <- struct{}{}
130 }()
131 // HACK: we need to wait for the websocket connection to start here. so we do
132 // need to upstream something to jetstream client
133 time.Sleep(time.Second * 2)
134 go func() {
135 // HACK: also silly because it panics if the connection isnt established yet (why????????????)
136 defer func() {
137 panic := recover()
138 if panic != nil {
139 err = fmt.Errorf("%s", panic)
140 }
141 }()
142 err = stream.SendOptionsUpdate(opts)
143 }()
144 if err == nil {
145 <-streamDone
146 }
147 }
148 if err != nil {
149 logger.Error("stream failed", "error", err, "backoff", backoff)
150 time.Sleep(backoff)
151 backoff = backoff * 2
152 } else {
153 backoff = time.Second
154 }
155 }
156 }
157}
158
159func startJetstreamClient(ctx context.Context, logger *slog.Logger, name string, handleEvent HandleEvent) (*client.Client, func() error, error) {
160 config := client.DefaultClientConfig()
161 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
162 config.Compress = true
163 config.RequireHello = true
164
165 scheduler := sequential.NewScheduler(name, logger, handleEvent)
166
167 c, err := client.NewClient(config, logger, scheduler)
168 if err != nil {
169 logger.Error("failed to create jetstream client", "error", err)
170 return nil, nil, err
171 }
172
173 startFn := func() error {
174 if err := c.ConnectAndRead(ctx, nil); err != nil {
175 logger.Error("jetstream client failed", "error", err)
176 return err
177 }
178
179 return nil
180 }
181
182 return c, startFn, nil
183}