A URL shortener service that uses ATProto to allow self hosting and ensuring the user owns their data
1package atshorter
2
3import (
4 "context"
5 "encoding/json"
6
7 "fmt"
8 "log/slog"
9 "time"
10
11 "github.com/bluesky-social/jetstream/pkg/client"
12 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
13 "github.com/bluesky-social/jetstream/pkg/models"
14)
15
16type consumer struct {
17 cfg *client.ClientConfig
18 handler handler
19 logger *slog.Logger
20}
21
22func NewConsumer(jsAddr string, logger *slog.Logger, store HandlerStore, did string) *consumer {
23 cfg := client.DefaultClientConfig()
24 if jsAddr != "" {
25 cfg.WebsocketURL = jsAddr
26 }
27 cfg.WantedCollections = []string{
28 "com.atshorter.shorturl",
29 }
30 cfg.WantedDids = []string{did}
31
32 return &consumer{
33 cfg: cfg,
34 logger: logger,
35 handler: handler{
36 store: store,
37 },
38 }
39}
40
41func (c *consumer) Consume(ctx context.Context) error {
42 scheduler := sequential.NewScheduler("jetstream_at_shorter_url", c.logger, c.handler.HandleEvent)
43 defer scheduler.Shutdown()
44
45 client, err := client.NewClient(c.cfg, c.logger, scheduler)
46 if err != nil {
47 return fmt.Errorf("failed to create client: %w", err)
48 }
49
50 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
51
52 if err := client.ConnectAndRead(ctx, &cursor); err != nil {
53 return fmt.Errorf("connect and read: %w", err)
54 }
55
56 slog.Info("stopping consume")
57 return nil
58}
59
60type HandlerStore interface {
61 CreateURL(id, url, did, originHost string, createdAt int64) error
62 DeleteURL(id, did string) error
63}
64
65type handler struct {
66 store HandlerStore
67}
68
69func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
70 if event.Commit == nil {
71 return nil
72 }
73
74 switch event.Commit.Operation {
75 case models.CommitOperationCreate:
76 return h.handleCreateEvent(ctx, event)
77 case models.CommitOperationDelete:
78 return h.handleDeleteEvent(ctx, event)
79 default:
80 return nil
81 }
82}
83
84type ShortURLRecord struct {
85 URL string `json:"url"`
86 CreatedAt time.Time `json:"createdAt"`
87 Origin string `json:"origin"`
88}
89
90func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error {
91 var record ShortURLRecord
92 if err := json.Unmarshal(event.Commit.Record, &record); err != nil {
93 slog.Error("unmarshal record", "error", err)
94 return nil
95 }
96
97 err := h.store.CreateURL(event.Commit.RKey, record.URL, event.Did, record.Origin, record.CreatedAt.UnixMilli())
98 if err != nil {
99 // TODO: proper error handling in case this fails, we want to try again
100 slog.Error("failed to store short URL", "error", err)
101 }
102
103 return nil
104}
105
106func (h *handler) handleDeleteEvent(_ context.Context, event *models.Event) error {
107 err := h.store.DeleteURL(event.Commit.RKey, event.Did)
108 if err != nil {
109 // TODO: proper error handling in case this fails, we want to try again
110 slog.Error("failed to delete short URL from store", "error", err)
111 }
112
113 return nil
114}