A URL shortener service that uses ATProto to allow self hosting and ensuring the user owns their data
at main 2.8 kB view raw
1package atshorter 2 3import ( 4 "context" 5 "encoding/json" 6 7 "fmt" 8 "log/slog" 9 "time" 10 11 "github.com/bluesky-social/jetstream/pkg/client" 12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 13 "github.com/bluesky-social/jetstream/pkg/models" 14) 15 16type consumer struct { 17 cfg *client.ClientConfig 18 handler handler 19 logger *slog.Logger 20} 21 22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore, did string) *consumer { 23 cfg := client.DefaultClientConfig() 24 if jsAddr != "" { 25 cfg.WebsocketURL = jsAddr 26 } 27 cfg.WantedCollections = []string{ 28 "com.atshorter.shorturl", 29 } 30 cfg.WantedDids = []string{did} 31 32 return &consumer{ 33 cfg: cfg, 34 logger: logger, 35 handler: handler{ 36 store: store, 37 }, 38 } 39} 40 41func (c *consumer) Consume(ctx context.Context) error { 42 scheduler := sequential.NewScheduler("jetstream_at_shorter_url", c.logger, c.handler.HandleEvent) 43 defer scheduler.Shutdown() 44 45 client, err := client.NewClient(c.cfg, c.logger, scheduler) 46 if err != nil { 47 return fmt.Errorf("failed to create client: %w", err) 48 } 49 50 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 51 52 if err := client.ConnectAndRead(ctx, &cursor); err != nil { 53 return fmt.Errorf("connect and read: %w", err) 54 } 55 56 slog.Info("stopping consume") 57 return nil 58} 59 60type HandlerStore interface { 61 CreateURL(id, url, did, originHost string, createdAt int64) error 62 DeleteURL(id, did string) error 63} 64 65type handler struct { 66 store HandlerStore 67} 68 69func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 70 if event.Commit == nil { 71 return nil 72 } 73 74 switch event.Commit.Operation { 75 case models.CommitOperationCreate: 76 return h.handleCreateEvent(ctx, event) 77 case models.CommitOperationDelete: 78 return h.handleDeleteEvent(ctx, event) 79 default: 80 return nil 81 } 82} 83 84type ShortURLRecord struct { 85 URL string `json:"url"` 86 CreatedAt time.Time `json:"createdAt"` 87 Origin string `json:"origin"` 88} 89 90func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error { 91 var record ShortURLRecord 92 if err := json.Unmarshal(event.Commit.Record, &record); err != nil { 93 slog.Error("unmarshal record", "error", err) 94 return nil 95 } 96 97 err := h.store.CreateURL(event.Commit.RKey, record.URL, event.Did, record.Origin, record.CreatedAt.UnixMilli()) 98 if err != nil { 99 // TODO: proper error handling in case this fails, we want to try again 100 slog.Error("failed to store short URL", "error", err) 101 } 102 103 return nil 104} 105 106func (h *handler) handleDeleteEvent(_ context.Context, event *models.Event) error { 107 err := h.store.DeleteURL(event.Commit.RKey, event.Did) 108 if err != nil { 109 // TODO: proper error handling in case this fails, we want to try again 110 slog.Error("failed to delete short URL from store", "error", err) 111 } 112 113 return nil 114}