1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log/slog"
8 "net/http"
9 "net/url"
10 "strings"
11
12 "github.com/bluesky-social/indigo/api/atproto"
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/bluesky-social/indigo/events"
15 "github.com/bluesky-social/indigo/events/schedulers/parallel"
16 lexutil "github.com/bluesky-social/indigo/lex/util"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/repomgr"
19 "github.com/gorilla/websocket"
20)
21
22func main() {
23 runFirehoseConsumer("ws://localhost:8080")
24}
25
26func runFirehoseConsumer(relayHost string) error {
27 dialer := websocket.DefaultDialer
28 u, err := url.Parse("wss://cocoon.hailey.at")
29 if err != nil {
30 return fmt.Errorf("invalid relayHost: %w", err)
31 }
32
33 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
34 conn, _, err := dialer.Dial(u.String(), http.Header{
35 "User-Agent": []string{fmt.Sprintf("hot-topic/0.0.0")},
36 })
37 if err != nil {
38 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
39 }
40
41 rsc := &events.RepoStreamCallbacks{
42 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
43 fmt.Println(evt.Repo)
44 return handleRepoCommit(evt)
45 },
46 RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error {
47 fmt.Println(evt.Did, evt.Handle)
48 return nil
49 },
50 }
51
52 var scheduler events.Scheduler
53 parallelism := 700
54 scheduler = parallel.NewScheduler(parallelism, 1000, relayHost, rsc.EventHandler)
55
56 return events.HandleRepoStream(context.TODO(), conn, scheduler, slog.Default())
57}
58
59func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) {
60 parts := strings.SplitN(path, "/", 3)
61 if len(parts) != 2 {
62 return "", "", fmt.Errorf("invalid record path: %s", path)
63 }
64 collection, err := syntax.ParseNSID(parts[0])
65 if err != nil {
66 return "", "", err
67 }
68 rkey, err := syntax.ParseRecordKey(parts[1])
69 if err != nil {
70 return "", "", err
71 }
72 return collection, rkey, nil
73}
74
75func handleRepoCommit(evt *atproto.SyncSubscribeRepos_Commit) error {
76 if evt.TooBig {
77 return nil
78 }
79
80 did, err := syntax.ParseDID(evt.Repo)
81 if err != nil {
82 panic(err)
83 }
84
85 rr, err := repo.ReadRepoFromCar(context.TODO(), bytes.NewReader(evt.Blocks))
86 if err != nil {
87 panic(err)
88 }
89
90 for _, op := range evt.Ops {
91 collection, rkey, err := splitRepoPath(op.Path)
92 if err != nil {
93 panic(err)
94 }
95
96 ek := repomgr.EventKind(op.Action)
97
98 go func() {
99 switch ek {
100 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
101 rc, recordCBOR, err := rr.GetRecordBytes(context.TODO(), op.Path)
102 if err != nil {
103 panic(err)
104 }
105
106 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
107 panic("nocid")
108 }
109
110 _ = collection
111 _ = rkey
112 _ = recordCBOR
113 _ = did
114
115 }
116 }()
117 }
118
119 return nil
120}