this repo has no description
1package photocopy 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/araddon/dateparse" 11 "github.com/bluesky-social/indigo/api/bsky" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/haileyok/photocopy/models" 14) 15 16func (p *Photocopy) handleCreate(ctx context.Context, recb []byte, indexedAt, rev, did, collection, rkey, cid, seq string) error { 17 iat, err := dateparse.ParseAny(indexedAt) 18 if err != nil { 19 return err 20 } 21 22 if err := p.handleCreateRecord(ctx, did, rkey, collection, cid, recb, seq); err != nil { 23 p.logger.Error("error creating record", "error", err) 24 } 25 26 switch collection { 27 case "app.bsky.feed.post": 28 return p.handleCreatePost(ctx, rev, recb, uriFromParts(did, collection, rkey), did, collection, rkey, cid, iat) 29 case "app.bsky.graph.follow": 30 return p.handleCreateFollow(ctx, recb, uriFromParts(did, collection, rkey), did, rkey, iat) 31 case "app.bsky.feed.like", "app.bsky.feed.repost": 32 return p.handleCreateInteraction(ctx, recb, uriFromParts(did, collection, rkey), did, collection, rkey, iat) 33 default: 34 return nil 35 } 36} 37 38func (p *Photocopy) handleCreateRecord(ctx context.Context, did, rkey, collection, cid string, raw []byte, seq string) error { 39 var cat time.Time 40 prkey, err := syntax.ParseTID(rkey) 41 if err == nil { 42 cat = prkey.Time() 43 } else { 44 cat = time.Now() 45 } 46 47 rec := models.Record{ 48 Did: did, 49 Rkey: rkey, 50 Collection: collection, 51 Cid: cid, 52 Seq: seq, 53 Raw: string(raw), 54 CreatedAt: cat, 55 } 56 57 if err := p.inserters.recordsInserter.Insert(ctx, rec); err != nil { 58 return err 59 } 60 61 return nil 62} 63 64func (p *Photocopy) handleCreatePost(ctx context.Context, rev string, recb []byte, uri, did, collection, rkey, cid string, indexedAt time.Time) error { 65 var rec bsky.FeedPost 66 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 67 return err 68 } 69 70 cat, err := parseTimeFromRecord(rec, rkey) 71 if err != nil { 72 return err 73 } 74 75 lang := "" 76 if len(rec.Langs) != 0 { 77 lang = rec.Langs[0] 78 } 79 80 post := models.Post{ 81 Uri: uri, 82 Rkey: rkey, 83 CreatedAt: *cat, 84 IndexedAt: indexedAt, 85 Did: did, 86 Lang: lang, 87 } 88 89 if rec.Reply != nil { 90 if rec.Reply.Parent != nil { 91 aturi, err := syntax.ParseATURI(rec.Reply.Parent.Uri) 92 if err != nil { 93 return fmt.Errorf("error parsing at-uri: %w", err) 94 95 } 96 post.ParentDid = aturi.Authority().String() 97 post.ParentUri = rec.Reply.Parent.Uri 98 } 99 if rec.Reply.Root != nil { 100 aturi, err := syntax.ParseATURI(rec.Reply.Root.Uri) 101 if err != nil { 102 return fmt.Errorf("error parsing at-uri: %w", err) 103 104 } 105 post.RootDid = aturi.Authority().String() 106 post.RootUri = rec.Reply.Root.Uri 107 } 108 } 109 110 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 111 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecord.Record.Uri) 112 if err != nil { 113 return fmt.Errorf("error parsing at-uri: %w", err) 114 115 } 116 post.QuoteDid = aturi.Authority().String() 117 post.QuoteUri = rec.Embed.EmbedRecord.Record.Uri 118 } else if rec.Embed != nil && rec.Embed.EmbedRecordWithMedia != nil && rec.Embed.EmbedRecordWithMedia.Record != nil && rec.Embed.EmbedRecordWithMedia.Record.Record != nil { 119 aturi, err := syntax.ParseATURI(rec.Embed.EmbedRecordWithMedia.Record.Record.Uri) 120 if err != nil { 121 return fmt.Errorf("error parsing at-uri: %w", err) 122 123 } 124 post.QuoteDid = aturi.Authority().String() 125 post.QuoteUri = rec.Embed.EmbedRecordWithMedia.Record.Record.Uri 126 } 127 128 if err := p.inserters.postsInserter.Insert(ctx, post); err != nil { 129 return err 130 } 131 132 return nil 133} 134 135func (p *Photocopy) handleCreateFollow(ctx context.Context, recb []byte, uri, did, rkey string, indexedAt time.Time) error { 136 var rec bsky.GraphFollow 137 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 138 return err 139 } 140 141 cat, err := parseTimeFromRecord(rec, rkey) 142 if err != nil { 143 return err 144 } 145 146 follow := models.Follow{ 147 Uri: uri, 148 Did: did, 149 Rkey: rkey, 150 CreatedAt: *cat, 151 IndexedAt: indexedAt, 152 Subject: rec.Subject, 153 } 154 155 if err := p.inserters.followsInserter.Insert(ctx, follow); err != nil { 156 return err 157 } 158 159 return nil 160} 161 162func (p *Photocopy) handleCreateInteraction(ctx context.Context, recb []byte, uri, did, collection, rkey string, indexedAt time.Time) error { 163 colPts := strings.Split(collection, ".") 164 if len(colPts) < 4 { 165 return fmt.Errorf("invalid collection type %s", collection) 166 } 167 168 interaction := models.Interaction{ 169 Uri: uri, 170 Kind: colPts[3], 171 Rkey: rkey, 172 IndexedAt: indexedAt, 173 Did: did, 174 SubjectUri: uri, 175 SubjectDid: did, 176 } 177 178 switch collection { 179 case "app.bsky.feed.like": 180 var rec bsky.FeedLike 181 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 182 return err 183 } 184 185 cat, err := parseTimeFromRecord(rec, rkey) 186 if err != nil { 187 return err 188 } 189 190 if rec.Subject == nil { 191 return fmt.Errorf("invalid subject in like") 192 } 193 194 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 195 if err != nil { 196 return fmt.Errorf("error parsing at-uri: %w", err) 197 198 } 199 200 interaction.SubjectDid = aturi.Authority().String() 201 interaction.SubjectUri = rec.Subject.Uri 202 interaction.CreatedAt = *cat 203 case "app.bsky.feed.repost": 204 var rec bsky.FeedRepost 205 if err := rec.UnmarshalCBOR(bytes.NewReader(recb)); err != nil { 206 return err 207 } 208 209 cat, err := parseTimeFromRecord(rec, rkey) 210 if err != nil { 211 return err 212 } 213 214 if rec.Subject == nil { 215 return fmt.Errorf("invalid subject in repost") 216 } 217 218 aturi, err := syntax.ParseATURI(rec.Subject.Uri) 219 if err != nil { 220 return fmt.Errorf("error parsing at-uri: %w", err) 221 222 } 223 224 interaction.SubjectDid = aturi.Authority().String() 225 interaction.SubjectUri = rec.Subject.Uri 226 interaction.CreatedAt = *cat 227 } 228 229 if err := p.inserters.interactionsInserter.Insert(ctx, interaction); err != nil { 230 return err 231 } 232 233 return nil 234} 235 236func parseTimeFromRecord(rec any, rkey string) (*time.Time, error) { 237 var rkeyTime time.Time 238 if rkey != "self" { 239 rt, err := syntax.ParseTID(rkey) 240 if err == nil { 241 rkeyTime = rt.Time() 242 } 243 } 244 switch rec := rec.(type) { 245 case *bsky.FeedPost: 246 t, err := dateparse.ParseAny(rec.CreatedAt) 247 if err != nil { 248 return nil, err 249 } 250 251 if inRange(t) { 252 return &t, nil 253 } 254 255 if rkeyTime.IsZero() || !inRange(rkeyTime) { 256 return timePtr(time.Now()), nil 257 } 258 259 return &rkeyTime, nil 260 case *bsky.FeedRepost: 261 t, err := dateparse.ParseAny(rec.CreatedAt) 262 if err != nil { 263 return nil, err 264 } 265 266 if inRange(t) { 267 return timePtr(t), nil 268 } 269 270 if rkeyTime.IsZero() { 271 return nil, fmt.Errorf("failed to get a useful timestamp from record") 272 } 273 274 return &rkeyTime, nil 275 case *bsky.FeedLike: 276 t, err := dateparse.ParseAny(rec.CreatedAt) 277 if err != nil { 278 return nil, err 279 } 280 281 if inRange(t) { 282 return timePtr(t), nil 283 } 284 285 if rkeyTime.IsZero() { 286 return nil, fmt.Errorf("failed to get a useful timestamp from record") 287 } 288 289 return &rkeyTime, nil 290 case *bsky.ActorProfile: 291 // We can't really trust the createdat in the profile record anyway, and its very possible its missing. just use iat for this one 292 return timePtr(time.Now()), nil 293 case *bsky.FeedGenerator: 294 if !rkeyTime.IsZero() && inRange(rkeyTime) { 295 return &rkeyTime, nil 296 } 297 return timePtr(time.Now()), nil 298 default: 299 if !rkeyTime.IsZero() && inRange(rkeyTime) { 300 return &rkeyTime, nil 301 } 302 return timePtr(time.Now()), nil 303 } 304} 305 306func inRange(t time.Time) bool { 307 now := time.Now() 308 if t.Before(now) { 309 return now.Sub(t) <= time.Hour*24*365*5 310 } 311 return t.Sub(now) <= time.Hour*24*200 312} 313 314func timePtr(t time.Time) *time.Time { 315 return &t 316}