its for when you want to get like notifications for your reposts

fix(server): improve shard handling

ptr.pet 9fc61c80 cebff7b9

verified
Changed files
+53 -32
server
+51 -30
server/jetstream.go
···
optsFn OptsFn
}
-
func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) StreamManager {
-
return StreamManager{
+
func NewStreamManager(logger *slog.Logger, name string, handleEvent HandleEvent, optsFn OptsFn) *StreamManager {
+
manager := &StreamManager{
ctx: context.TODO(),
logger: logger.With("stream", name),
streamsLock: sync.Mutex{},
···
handleEvent: handleEvent,
optsFn: optsFn,
}
+
go func() {
+
for {
+
time.Sleep(time.Minute * 2)
+
manager.streamsLock.Lock()
+
manager.logger.Info("sharding stats", "streamCount", len(manager.streams))
+
manager.streamsLock.Unlock()
+
}
+
}()
+
return manager
}
// doesnt lock streams!!!
···
func (manager *StreamManager) updateOpts() {
chunks, userCount := manager.chunkedOpts()
+
logger := manager.logger.With("userCount", userCount)
manager.streamsLock.Lock()
-
idsSeen := make(map[int]struct{}, 0)
+
idsSeen := make(map[int]struct{}, len(manager.streams))
// update existing streams or create new ones
for id, opts := range chunks {
idsSeen[id] = struct{}{}
···
continue
}
if err := stream.inner.SendOptionsUpdate(opts); err != nil {
-
manager.logger.Error("couldnt update follow stream opts", "error", err, "streamId", id)
+
logger.Error("couldnt update follow stream opts", "error", err, "streamId", id)
}
} else {
manager.startSingle(id, opts)
···
}
}
manager.streamsLock.Unlock()
-
manager.logger.Info("updated opts", "userCount", userCount)
+
logger.Info("updated opts")
}
type HandleEvent func(context.Context, *models.Event) error
···
func startJetstreamLoop(ctx context.Context, logger *slog.Logger, outStream **client.Client, name string, handleEvent HandleEvent, opts models.SubscriberOptionsUpdatePayload) {
backoff := time.Second
for {
-
done := make(chan struct{})
-
if ctx.Err() != nil {
-
break
-
}
-
stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent)
-
*outStream = stream
-
if startFn != nil {
-
logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs))
-
go func() {
-
err = startFn()
-
done <- struct{}{}
-
}()
-
// HACK: we need to wait for the websocket connection to start here. so we do
-
// need to upstream something to jetstream client
-
time.Sleep(time.Second * 2)
-
err = stream.SendOptionsUpdate(opts)
-
if err == nil {
-
<-done
+
select {
+
case <-ctx.Done():
+
return
+
default:
+
streamDone := make(chan struct{})
+
stream, startFn, err := startJetstreamClient(ctx, logger, name, handleEvent)
+
*outStream = stream
+
if startFn != nil {
+
logger.Info("starting jetstream client", "collections", opts.WantedCollections, "userCount", len(opts.WantedDIDs))
+
go func() {
+
err = startFn()
+
streamDone <- struct{}{}
+
}()
+
// HACK: we need to wait for the websocket connection to start here. so we do
+
// need to upstream something to jetstream client
+
time.Sleep(time.Second * 2)
+
go func() {
+
// HACK: also silly because it panics if the connection isnt established yet (why????????????)
+
defer func() {
+
panic := recover()
+
if panic != nil {
+
err = fmt.Errorf("%s", panic)
+
}
+
}()
+
err = stream.SendOptionsUpdate(opts)
+
}()
+
if err == nil {
+
<-streamDone
+
}
}
-
}
-
if err != nil {
-
logger.Error("stream failed", "error", err, "backoff", backoff)
-
time.Sleep(backoff)
-
backoff = backoff * 2
-
} else {
-
backoff = time.Second
+
if err != nil {
+
logger.Error("stream failed", "error", err, "backoff", backoff)
+
time.Sleep(backoff)
+
backoff = backoff * 2
+
} else {
+
backoff = time.Second
+
}
}
}
}
+2 -2
server/main.go
···
subscribers = hashmap.New[string, *SubscriberData]()
actorData = hashmap.New[syntax.DID, *ActorData]()
-
likeStreams StreamManager
-
followStreams StreamManager
+
likeStreams *StreamManager
+
followStreams *StreamManager
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {