package atshorter import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" ) type consumer struct { cfg *client.ClientConfig handler handler logger *slog.Logger } func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore, did string) *consumer { cfg := client.DefaultClientConfig() if jsAddr != "" { cfg.WebsocketURL = jsAddr } cfg.WantedCollections = []string{ "com.atshorter.shorturl", } cfg.WantedDids = []string{did} return &consumer{ cfg: cfg, logger: logger, handler: handler{ store: store, }, } } func (c *consumer) Consume(ctx context.Context) error { scheduler := sequential.NewScheduler("jetstream_at_shorter_url", c.logger, c.handler.HandleEvent) defer scheduler.Shutdown() client, err := client.NewClient(c.cfg, c.logger, scheduler) if err != nil { return fmt.Errorf("failed to create client: %w", err) } cursor := time.Now().Add(1 * -time.Minute).UnixMicro() if err := client.ConnectAndRead(ctx, &cursor); err != nil { return fmt.Errorf("connect and read: %w", err) } slog.Info("stopping consume") return nil } type HandlerStore interface { CreateURL(id, url, did, originHost string, createdAt int64) error DeleteURL(id, did string) error } type handler struct { store HandlerStore } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit == nil { return nil } switch event.Commit.Operation { case models.CommitOperationCreate: return h.handleCreateEvent(ctx, event) case models.CommitOperationDelete: return h.handleDeleteEvent(ctx, event) default: return nil } } type ShortURLRecord struct { URL string `json:"url"` CreatedAt time.Time `json:"createdAt"` Origin string `json:"origin"` } func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error { var record ShortURLRecord if err := json.Unmarshal(event.Commit.Record, &record); err != nil { slog.Error("unmarshal record", "error", err) return nil } err := h.store.CreateURL(event.Commit.RKey, record.URL, event.Did, record.Origin, record.CreatedAt.UnixMilli()) if err != nil { // TODO: proper error handling in case this fails, we want to try again slog.Error("failed to store short URL", "error", err) } return nil } func (h *handler) handleDeleteEvent(_ context.Context, event *models.Event) error { err := h.store.DeleteURL(event.Commit.RKey, event.Did) if err != nil { // TODO: proper error handling in case this fails, we want to try again slog.Error("failed to delete short URL from store", "error", err) } return nil }