1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "os"
10 "time"
11
12 "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16 "github.com/bluesky-social/indigo/repo"
17 "github.com/bluesky-social/indigo/repomgr"
18 "github.com/gorilla/websocket"
19 "github.com/ipfs/go-cid"
20)
21
22func (p *Photocopy) startConsumer(ctx context.Context, cancel context.CancelFunc) error {
23 defer cancel()
24
25 go func() {
26 ticker := time.NewTicker(5 * time.Second)
27 defer ticker.Stop()
28
29 for range ticker.C {
30 if err := os.WriteFile(p.cursorFile, []byte(p.cursor), 0644); err != nil {
31 p.logger.Error("error saving cursor", "error", err)
32 }
33 p.logger.Debug("saving cursor", "seq", p.cursor)
34 }
35 }()
36
37 u, err := url.Parse(p.relayHost)
38 if err != nil {
39 return err
40 }
41 u.Path = "/xrpc/com.atproto.sync.subscribeRepos"
42
43 prevCursor, err := p.loadCursor()
44 if err != nil {
45 if !os.IsNotExist(err) {
46 panic(err)
47 }
48 } else {
49 p.cursor = prevCursor
50 }
51
52 if prevCursor != "" {
53 u.RawQuery = "cursor=" + prevCursor
54 }
55
56 rsc := events.RepoStreamCallbacks{
57 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
58 go p.repoCommit(ctx, evt)
59 return nil
60 },
61 }
62
63 d := websocket.DefaultDialer
64
65 p.logger.Info("connecting to relay", "url", u.String())
66
67 con, _, err := d.Dial(u.String(), http.Header{
68 "user-agent": []string{"photocopy/0.0.0"},
69 })
70 if err != nil {
71 return fmt.Errorf("failed to connect to relay: %w", err)
72 }
73
74 scheduler := parallel.NewScheduler(400, 10, con.RemoteAddr().String(), rsc.EventHandler)
75
76 if err := events.HandleRepoStream(ctx, con, scheduler, p.logger); err != nil {
77 p.logger.Error("repo stream failed", "error", err)
78 }
79
80 p.logger.Info("repo stream shut down")
81
82 return nil
83}
84
85func (p *Photocopy) repoCommit(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) {
86 p.cursor = fmt.Sprintf("%d", evt.Seq)
87
88 if evt.TooBig {
89 p.logger.Warn("commit too big", "repo", evt.Repo, "seq", evt.Seq)
90 return
91 }
92
93 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
94 if err != nil {
95 p.logger.Error("failed to read event repo", "error", err)
96 return
97 }
98
99 did, err := syntax.ParseDID(evt.Repo)
100 if err != nil {
101 p.logger.Error("failed to parse did", "error", err)
102 return
103 }
104
105 for _, op := range evt.Ops {
106 collection, rkey, err := syntax.ParseRepoPath(op.Path)
107 if err != nil {
108 p.logger.Error("invalid path in repo op")
109 continue
110 }
111
112 ek := repomgr.EventKind(op.Action)
113
114 switch ek {
115 case repomgr.EvtKindCreateRecord:
116 if op.Cid == nil {
117 p.logger.Warn("op missing reccid", "path", op.Path, "action", op.Action)
118 continue
119 }
120
121 c := (cid.Cid)(*op.Cid)
122 reccid, rec, err := r.GetRecordBytes(ctx, op.Path)
123 if err != nil {
124 p.logger.Error("failed to get record bytes", "error", err, "path", op.Path)
125 continue
126 }
127
128 if c != reccid {
129 p.logger.Warn("reccid mismatch", "from_event", c, "from_blocks", reccid, "path", op.Path)
130 continue
131 }
132
133 if rec == nil {
134 p.logger.Warn("record not found", "reccid", c, "path", op.Path)
135 continue
136 }
137
138 if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String(), fmt.Sprintf("%d", evt.Seq)); err != nil {
139 p.logger.Error("error handling create event", "error", err)
140 continue
141 }
142 case repomgr.EvtKindDeleteRecord:
143 if err := p.handleDelete(ctx, did.String(), collection.String(), rkey.String()); err != nil {
144 p.logger.Error("error handling delete event", "error", err)
145 continue
146 }
147 }
148 }
149}
150
151func (p *Photocopy) loadCursor() (string, error) {
152 b, err := os.ReadFile(p.cursorFile)
153 if err != nil {
154 return "", err
155 }
156 return string(b), nil
157}