an app.bsky.* indexer
1package main
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "strconv"
10 "strings"
11
12 comatproto "github.com/bluesky-social/indigo/api/atproto"
13 appbsky "github.com/bluesky-social/indigo/api/bsky"
14 "github.com/bluesky-social/indigo/backfill"
15 "github.com/ipfs/go-cid"
16)
17
18type commitHandler func(context.Context, *comatproto.SyncSubscribeRepos_Commit) error
19
20func (b *Backend) RepoCommitHandler(
21 ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit,
22) error {
23 select {
24 case <-ctx.Done():
25 return nil
26 default:
27 //
28 }
29
30 b.firehoseLk.Lock()
31 b.firehoseSeq = strconv.Itoa(int(evt.Seq))
32 b.firehoseLk.Unlock()
33
34 if b.backfillComplete {
35 return b.bf.HandleEvent(ctx, evt)
36 }
37
38 job, err := b.bf.Store.GetJob(ctx, evt.Repo)
39 if job == nil {
40 if errors.Is(err, backfill.ErrJobNotFound) {
41 return nil
42 } else {
43 return fmt.Errorf("error getting job: %w", err)
44 }
45 } else {
46 return b.bf.HandleEvent(ctx, evt)
47 }
48}
49
50type handleOpCreateUpdate func(context.Context, string, string, string, *[]byte, *cid.Cid) error
51type handleOpDelete func(context.Context, string, string, string) error
52
53func (b *Backend) HandleCreateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
54 if !strings.HasPrefix(path, "app.bsky.feed.generator/") {
55 return nil
56 }
57
58 sl := slog.With("source", "commitHandler")
59
60 var out appbsky.FeedGenerator
61 if err := out.UnmarshalCBOR(bytes.NewReader(*rec)); err != nil {
62 sl.Error("failed to unmarshal record", "err", err)
63 return fmt.Errorf("failed to unmarshal record: %w", err)
64 }
65
66 feedgen := &FeedGenerator{
67 AtUri: fmt.Sprintf("at://%s/%s", repo, path),
68 DisplayName: out.DisplayName,
69 FeedService: out.Did,
70 CreatedAt: out.CreatedAt,
71 Description: out.Description,
72 ContentMode: out.ContentMode,
73 AcceptsInteractions: out.AcceptsInteractions,
74 }
75
76 if err := b.data.Model(&FeedGenerator{}).Create(feedgen).Error; err != nil {
77 return fmt.Errorf("error adding feedgen to database: %w", err)
78 }
79
80 return nil
81}
82
83func (b *Backend) HandleUpdateOp(ctx context.Context, repo, rev, path string, rec *[]byte, cid *cid.Cid) error {
84 return nil
85}
86
87func (b *Backend) HandleDeleteOp(ctx context.Context, repo, rev, path string) error {
88 return nil
89}