this repo has no description
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "net/url" 9 "os" 10 "strings" 11 "time" 12 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/events" 16 "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 "github.com/gorilla/websocket" 20 "github.com/ipfs/go-cid" 21) 22 23func (p *Photocopy) startConsumer(ctx context.Context, cancel context.CancelFunc) error { 24 defer cancel() 25 26 go func() { 27 ticker := time.NewTicker(5 * time.Second) 28 defer ticker.Stop() 29 30 for range ticker.C { 31 if err := os.WriteFile(p.cursorFile, []byte(p.cursor), 0644); err != nil { 32 p.logger.Error("error saving cursor", "error", err) 33 } 34 p.logger.Debug("saving cursor", "seq", p.cursor) 35 } 36 }() 37 38 u, err := url.Parse(p.relayHost) 39 if err != nil { 40 return err 41 } 42 u.Path = "/xrpc/com.atproto.sync.subscribeRepos" 43 44 prevCursor, err := p.loadCursor() 45 if err != nil { 46 if !os.IsNotExist(err) { 47 panic(err) 48 } 49 } else { 50 p.cursor = prevCursor 51 } 52 53 if prevCursor != "" { 54 u.RawQuery = "cursor=" + prevCursor 55 } 56 57 rsc := events.RepoStreamCallbacks{ 58 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 59 go p.repoCommit(ctx, evt) 60 return nil 61 }, 62 } 63 64 d := websocket.DefaultDialer 65 66 p.logger.Info("connecting to relay", "url", u.String()) 67 68 con, _, err := d.Dial(u.String(), http.Header{ 69 "user-agent": []string{"photocopy/0.0.0"}, 70 }) 71 if err != nil { 72 return fmt.Errorf("failed to connect to relay: %w", err) 73 } 74 75 scheduler := parallel.NewScheduler(400, 10, con.RemoteAddr().String(), rsc.EventHandler) 76 77 if err := events.HandleRepoStream(ctx, con, scheduler, p.logger); err != nil { 78 p.logger.Error("repo stream failed", "error", err) 79 } 80 81 p.logger.Info("repo stream shut down") 82 83 return nil 84} 85 86func (p *Photocopy) repoCommit(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) { 87 p.cursor = fmt.Sprintf("%d", evt.Seq) 88 89 if evt.TooBig { 90 p.logger.Warn("commit too big", "repo", evt.Repo, "seq", evt.Seq) 91 return 92 } 93 94 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 95 if err != nil { 96 p.logger.Error("failed to read event repo", "error", err) 97 return 98 } 99 100 did, err := syntax.ParseDID(evt.Repo) 101 if err != nil { 102 p.logger.Error("failed to parse did", "error", err) 103 return 104 } 105 106 for _, op := range evt.Ops { 107 collection, rkey, err := syntax.ParseRepoPath(op.Path) 108 if err != nil { 109 p.logger.Error("invalid path in repo op") 110 continue 111 } 112 113 ek := repomgr.EventKind(op.Action) 114 115 switch ek { 116 case repomgr.EvtKindCreateRecord: 117 if op.Cid == nil { 118 p.logger.Warn("op missing reccid", "path", op.Path, "action", op.Action) 119 continue 120 } 121 122 c := (cid.Cid)(*op.Cid) 123 reccid, rec, err := r.GetRecordBytes(ctx, op.Path) 124 if err != nil { 125 p.logger.Error("failed to get record bytes", "error", err, "path", op.Path) 126 continue 127 } 128 129 if c != reccid { 130 p.logger.Warn("reccid mismatch", "from_event", c, "from_blocks", reccid, "path", op.Path) 131 continue 132 } 133 134 if rec == nil { 135 p.logger.Warn("record not found", "reccid", c, "path", op.Path) 136 continue 137 } 138 139 if !strings.HasPrefix(collection.String(), "app.bsky.") { 140 continue 141 } 142 143 if err := p.handleCreate(ctx, *rec, evt.Time, evt.Rev, did.String(), collection.String(), rkey.String(), reccid.String()); err != nil { 144 p.logger.Error("error handling create event", "error", err) 145 continue 146 } 147 case repomgr.EvtKindDeleteRecord: 148 if !strings.HasPrefix(collection.String(), "app.bsky.") { 149 continue 150 } 151 152 if err := p.handleDelete(ctx, did.String(), collection.String(), rkey.String()); err != nil { 153 p.logger.Error("error handling delete event", "error", err) 154 continue 155 } 156 } 157 } 158} 159 160func (p *Photocopy) loadCursor() (string, error) { 161 b, err := os.ReadFile(p.cursorFile) 162 if err != nil { 163 return "", err 164 } 165 return string(b), nil 166}