this repo has no description
at main 3.8 kB view raw
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "os" 10 "time" 11 12 "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 "github.com/bluesky-social/indigo/repo" 17 "github.com/bluesky-social/indigo/repomgr" 18 "github.com/gorilla/websocket" 19 "github.com/ipfs/go-cid" 20) 21 22func (p *Photocopy) startConsumer(ctx context.Context, cancel context.CancelFunc) error { 23 defer cancel() 24 25 go func() { 26 ticker := time.NewTicker(5 * time.Second) 27 defer ticker.Stop() 28 29 for range ticker.C { 30 if err := os.WriteFile(p.cursorFile, []byte(p.cursor), 0644); err != nil { 31 p.logger.Error("error saving cursor", "error", err) 32 } 33 p.logger.Debug("saving cursor", "seq", p.cursor) 34 } 35 }() 36 37 u, err := url.Parse(p.relayHost) 38 if err != nil { 39 return err 40 } 41 u.Path = "/xrpc/com.atproto.sync.subscribeRepos" 42 43 prevCursor, err := p.loadCursor() 44 if err != nil { 45 if !os.IsNotExist(err) { 46 panic(err) 47 } 48 } else { 49 p.cursor = prevCursor 50 } 51 52 if prevCursor != "" { 53 u.RawQuery = "cursor=" + prevCursor 54 } 55 56 rsc := events.RepoStreamCallbacks{ 57 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 58 go p.repoCommit(ctx, evt) 59 return nil 60 }, 61 } 62 63 d := websocket.DefaultDialer 64 65 p.logger.Info("connecting to relay", "url", u.String()) 66 67 con, _, err := d.Dial(u.String(), http.Header{ 68 "user-agent": []string{"photocopy/0.0.0"}, 69 }) 70 if err != nil { 71 return fmt.Errorf("failed to connect to relay: %w", err) 72 } 73 74 scheduler := parallel.NewScheduler(400, 10, con.RemoteAddr().String(), rsc.EventHandler) 75 76 if err := events.HandleRepoStream(ctx, con, scheduler, p.logger); err != nil { 77 p.logger.Error("repo stream failed", "error", err) 78 } 79 80 p.logger.Info("repo stream shut down") 81 82 return nil 83} 84 85func (p *Photocopy) repoCommit(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) { 86 p.cursor = fmt.Sprintf("%d", evt.Seq) 87 88 if evt.TooBig { 89 p.logger.Warn("commit too big", "repo", evt.Repo, "seq", evt.Seq) 90 return 91 } 92 93 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 94 if err != nil { 95 p.logger.Error("failed to read event repo", "error", err) 96 return 97 } 98 99 did, err := syntax.ParseDID(evt.Repo) 100 if err != nil { 101 p.logger.Error("failed to parse did", "error", err) 102 return 103 } 104 105 for _, op := range evt.Ops { 106 collection, rkey, err := syntax.ParseRepoPath(op.Path) 107 if err != nil { 108 p.logger.Error("invalid path in repo op") 109 continue 110 } 111 112 ek := repomgr.EventKind(op.Action) 113 114 switch ek { 115 case repomgr.EvtKindCreateRecord: 116 if op.Cid == nil { 117 p.logger.Warn("op missing reccid", "path", op.Path, "action", op.Action) 118 continue 119 } 120 121 c := (cid.Cid)(*op.Cid) 122 reccid, rec, err := r.GetRecordBytes(ctx, op.Path) 123 if err != nil { 124 p.logger.Error("failed to get record bytes", "error", err, "path", op.Path) 125 continue 126 } 127 128 if c != reccid { 129 p.logger.Warn("reccid mismatch", "from_event", c, "from_blocks", reccid, "path", op.Path) 130 continue 131 } 132 133 if rec == nil { 134 p.logger.Warn("record not found", "reccid", c, "path", op.Path) 135 continue 136 } 137 138 if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String(), fmt.Sprintf("%d", evt.Seq)); err != nil { 139 p.logger.Error("error handling create event", "error", err) 140 continue 141 } 142 case repomgr.EvtKindDeleteRecord: 143 if err := p.handleDelete(ctx, did.String(), collection.String(), rkey.String()); err != nil { 144 p.logger.Error("error handling delete event", "error", err) 145 continue 146 } 147 } 148 } 149} 150 151func (p *Photocopy) loadCursor() (string, error) { 152 b, err := os.ReadFile(p.cursorFile) 153 if err != nil { 154 return "", err 155 } 156 return string(b), nil 157}