its for when you want to get like notifications for your reposts
at main 5.3 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/jetstream/pkg/client" 11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 12 "github.com/bluesky-social/jetstream/pkg/models" 13) 14 15func Chunk[T any](slice []T, chunkSize int) [][]T { 16 var chunks [][]T 17 for i := 0; i < len(slice); i += chunkSize { 18 end := min(i+chunkSize, len(slice)) 19 chunks = append(chunks, slice[i:end]) 20 } 21 return chunks 22} 23 24type Stream struct { 25 inner *client.Client 26 cancel context.CancelFunc 27} 28type StreamManager struct { 29 ctx context.Context 30 logger *slog.Logger 31 streamsLock sync.Mutex 32 streams map[int]Stream 33 name string 34 handleEvent HandleEvent 35 optsFn OptsFn 36} 37 38func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) *StreamManager { 39 manager := &StreamManager{ 40 ctx: context.TODO(), 41 logger: logger.With("stream", name), 42 streamsLock: sync.Mutex{}, 43 streams: make(map[int]Stream), 44 name: name, 45 handleEvent: handleEvent, 46 optsFn: optsFn, 47 } 48 go func() { 49 for { 50 time.Sleep(time.Minute * 2) 51 manager.streamsLock.Lock() 52 manager.logger.Info("sharding stats", "streamCount", len(manager.streams)) 53 manager.streamsLock.Unlock() 54 } 55 }() 56 return manager 57} 58 59// doesnt lock streams!!! 60func (manager *StreamManager) startSingle(id int, opts models.SubscriberOptionsUpdatePayload) { 61 ctx, cancel := context.WithCancel(manager.ctx) 62 stream := Stream{inner: nil, cancel: cancel} 63 // add to streams and put on wait group 64 manager.streams[id] = stream 65 go startJetstreamLoop(ctx, manager.logger.With("streamId", id), &stream.inner, fmt.Sprintf("%s_%d", manager.name, id), manager.handleEvent, opts) 66} 67 68func (manager *StreamManager) chunkedOpts() ([]models.SubscriberOptionsUpdatePayload, int) { 69 results := make([]models.SubscriberOptionsUpdatePayload, 0) 70 opts := manager.optsFn() 71 for _, wantedDidsChunk := range Chunk(opts.WantedDIDs, 9999) { 72 results = append(results, models.SubscriberOptionsUpdatePayload{ 73 WantedCollections: opts.WantedCollections, 74 WantedDIDs: wantedDidsChunk, 75 MaxMessageSizeBytes: opts.MaxMessageSizeBytes, 76 }) 77 } 78 return results, len(opts.WantedDIDs) 79} 80 81func (manager *StreamManager) updateOpts() { 82 chunks, userCount := manager.chunkedOpts() 83 logger := manager.logger.With("userCount", userCount) 84 manager.streamsLock.Lock() 85 idsSeen := make(map[int]struct{}, len(manager.streams)) 86 // update existing streams or create new ones 87 for id, opts := range chunks { 88 idsSeen[id] = struct{}{} 89 if len(manager.streams) > id { 90 stream := manager.streams[id] 91 if stream.inner == nil { 92 continue 93 } 94 if err := stream.inner.SendOptionsUpdate(opts); err != nil { 95 logger.Error("couldnt update follow stream opts", "error", err, "streamId", id) 96 } 97 } else { 98 manager.startSingle(id, opts) 99 } 100 } 101 // cancel and delete unused streams 102 for k := range manager.streams { 103 if _, exists := idsSeen[k]; !exists { 104 manager.streams[k].cancel() 105 delete(manager.streams, k) 106 } 107 } 108 manager.streamsLock.Unlock() 109 logger.Info("updated opts") 110} 111 112type HandleEvent func(context.Context, *models.Event) error 113type OptsFn func() models.SubscriberOptionsUpdatePayload 114 115func startJetstreamLoop(ctx context.Context, logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, opts models.SubscriberOptionsUpdatePayload) { 116 backoff := time.Second 117 for { 118 select { 119 case <-ctx.Done(): 120 return 121 default: 122 streamDone := make(chan struct{}) 123 stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent) 124 *outStream = stream 125 if startFn != nil { 126 logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs)) 127 go func() { 128 err = startFn() 129 streamDone <- struct{}{} 130 }() 131 // HACK: we need to wait for the websocket connection to start here. so we do 132 // need to upstream something to jetstream client 133 time.Sleep(time.Second * 2) 134 go func() { 135 // HACK: also silly because it panics if the connection isnt established yet (why????????????) 136 defer func() { 137 panic := recover() 138 if panic != nil { 139 err = fmt.Errorf("%s", panic) 140 } 141 }() 142 err = stream.SendOptionsUpdate(opts) 143 }() 144 if err == nil { 145 <-streamDone 146 } 147 } 148 if err != nil { 149 logger.Error("stream failed", "error", err, "backoff", backoff) 150 time.Sleep(backoff) 151 backoff = backoff * 2 152 } else { 153 backoff = time.Second 154 } 155 } 156 } 157} 158 159func startJetstreamClient(ctx context.Context, logger *slog.Logger, name string, handleEvent HandleEvent) (*client.Client, func() error, error) { 160 config := client.DefaultClientConfig() 161 config.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 162 config.Compress = true 163 config.RequireHello = true 164 165 scheduler := sequential.NewScheduler(name, logger, handleEvent) 166 167 c, err := client.NewClient(config, logger, scheduler) 168 if err != nil { 169 logger.Error("failed to create jetstream client", "error", err) 170 return nil, nil, err 171 } 172 173 startFn := func() error { 174 if err := c.ConnectAndRead(ctx, nil); err != nil { 175 logger.Error("jetstream client failed", "error", err) 176 return err 177 } 178 179 return nil 180 } 181 182 return c, startFn, nil 183}