1package photocopy
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "net/http"
8 "net/url"
9 "os"
10 "strings"
11 "time"
12
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/events"
16 "github.com/bluesky-social/indigo/events/schedulers/parallel"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/repomgr"
19 "github.com/gorilla/websocket"
20 "github.com/ipfs/go-cid"
21)
22
23func (p *Photocopy) startConsumer(ctx context.Context, cancel context.CancelFunc) error {
24 defer cancel()
25
26 go func() {
27 ticker := time.NewTicker(5 * time.Second)
28 defer ticker.Stop()
29
30 for range ticker.C {
31 if err := os.WriteFile(p.cursorFile, []byte(p.cursor), 0644); err != nil {
32 p.logger.Error("error saving cursor", "error", err)
33 }
34 p.logger.Debug("saving cursor", "seq", p.cursor)
35 }
36 }()
37
38 u, err := url.Parse(p.relayHost)
39 if err != nil {
40 return err
41 }
42 u.Path = "/xrpc/com.atproto.sync.subscribeRepos"
43
44 prevCursor, err := p.loadCursor()
45 if err != nil {
46 if !os.IsNotExist(err) {
47 panic(err)
48 }
49 } else {
50 p.cursor = prevCursor
51 }
52
53 if prevCursor != "" {
54 u.RawQuery = "cursor=" + prevCursor
55 }
56
57 rsc := events.RepoStreamCallbacks{
58 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
59 go p.repoCommit(ctx, evt)
60 return nil
61 },
62 }
63
64 d := websocket.DefaultDialer
65
66 p.logger.Info("connecting to relay", "url", u.String())
67
68 con, _, err := d.Dial(u.String(), http.Header{
69 "user-agent": []string{"photocopy/0.0.0"},
70 })
71 if err != nil {
72 return fmt.Errorf("failed to connect to relay: %w", err)
73 }
74
75 scheduler := parallel.NewScheduler(400, 10, con.RemoteAddr().String(), rsc.EventHandler)
76
77 if err := events.HandleRepoStream(ctx, con, scheduler, p.logger); err != nil {
78 p.logger.Error("repo stream failed", "error", err)
79 }
80
81 p.logger.Info("repo stream shut down")
82
83 return nil
84}
85
86func (p *Photocopy) repoCommit(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) {
87 p.cursor = fmt.Sprintf("%d", evt.Seq)
88
89 if evt.TooBig {
90 p.logger.Warn("commit too big", "repo", evt.Repo, "seq", evt.Seq)
91 return
92 }
93
94 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
95 if err != nil {
96 p.logger.Error("failed to read event repo", "error", err)
97 return
98 }
99
100 did, err := syntax.ParseDID(evt.Repo)
101 if err != nil {
102 p.logger.Error("failed to parse did", "error", err)
103 return
104 }
105
106 for _, op := range evt.Ops {
107 collection, rkey, err := syntax.ParseRepoPath(op.Path)
108 if err != nil {
109 p.logger.Error("invalid path in repo op")
110 continue
111 }
112
113 ek := repomgr.EventKind(op.Action)
114
115 switch ek {
116 case repomgr.EvtKindCreateRecord:
117 if op.Cid == nil {
118 p.logger.Warn("op missing reccid", "path", op.Path, "action", op.Action)
119 continue
120 }
121
122 c := (cid.Cid)(*op.Cid)
123 reccid, rec, err := r.GetRecordBytes(ctx, op.Path)
124 if err != nil {
125 p.logger.Error("failed to get record bytes", "error", err, "path", op.Path)
126 continue
127 }
128
129 if c != reccid {
130 p.logger.Warn("reccid mismatch", "from_event", c, "from_blocks", reccid, "path", op.Path)
131 continue
132 }
133
134 if rec == nil {
135 p.logger.Warn("record not found", "reccid", c, "path", op.Path)
136 continue
137 }
138
139 if !strings.HasPrefix(collection.String(), "app.bsky.") {
140 continue
141 }
142
143 if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String()); err != nil {
144 p.logger.Error("error handling create event", "error", err)
145 continue
146 }
147 case repomgr.EvtKindDeleteRecord:
148 if !strings.HasPrefix(collection.String(), "app.bsky.") {
149 continue
150 }
151
152 if err := p.handleDelete(ctx, did.String(), collection.String(), rkey.String()); err != nil {
153 p.logger.Error("error handling delete event", "error", err)
154 continue
155 }
156 }
157 }
158}
159
160func (p *Photocopy) loadCursor() (string, error) {
161 b, err := os.ReadFile(p.cursorFile)
162 if err != nil {
163 return "", err
164 }
165 return string(b), nil
166}