an app.bsky.* indexer
1package main 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "log/slog" 9 "strconv" 10 "strings" 11 12 comatproto "github.com/bluesky-social/indigo/api/atproto" 13 appbsky "github.com/bluesky-social/indigo/api/bsky" 14 "github.com/bluesky-social/indigo/backfill" 15 "github.com/ipfs/go-cid" 16) 17 18type commitHandler func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error 19 20func (b *Backend) RepoCommitHandler( 21 ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, 22) error { 23 select { 24 case <-ctx.Done(): 25 return nil 26 default: 27 // 28 } 29 30 b.firehoseLk.Lock() 31 b.firehoseSeq = strconv.Itoa(int(evt.Seq)) 32 b.firehoseLk.Unlock() 33 34 if b.backfillComplete { 35 return b.bf.HandleEvent(ctx, evt) 36 } 37 38 job, err := b.bf.Store.GetJob(ctx, evt.Repo) 39 if job == nil { 40 if errors.Is(err, backfill.ErrJobNotFound) { 41 return nil 42 } else { 43 return fmt.Errorf("error getting job: %w", err) 44 } 45 } else { 46 return b.bf.HandleEvent(ctx, evt) 47 } 48} 49 50type handleOpCreateUpdate func(context.Context, string, string, string, *[]byte, *cid.Cid) error 51type handleOpDelete func(context.Context, string, string, string) error 52 53func (b *Backend) HandleCreateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error { 54 if !strings.HasPrefix(path, "app.bsky.feed.generator/") { 55 return nil 56 } 57 58 sl := slog.With("source", "commitHandler") 59 60 var out appbsky.FeedGenerator 61 if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil { 62 sl.Error("failed to unmarshal record", "err", err) 63 return fmt.Errorf("failed to unmarshal record: %w", err) 64 } 65 66 feedgen := &FeedGenerator{ 67 AtUri: fmt.Sprintf("at://%s/%s", repo, path), 68 DisplayName: out.DisplayName, 69 FeedService: out.Did, 70 CreatedAt: out.CreatedAt, 71 Description: out.Description, 72 ContentMode: out.ContentMode, 73 AcceptsInteractions: out.AcceptsInteractions, 74 } 75 76 if err := b.data.Model(&FeedGenerator{}).Create(feedgen).Error; err != nil { 77 return fmt.Errorf("error adding feedgen to database: %w", err) 78 } 79 80 return nil 81} 82 83func (b *Backend) HandleUpdateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error { 84 return nil 85} 86 87func (b *Backend) HandleDeleteOp(ctx context.Context, repo, rev, path string) error { 88 return nil 89}