An atproto PDS written in Go
at main 2.8 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "net/url" 10 "strings" 11 12 "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 "github.com/gorilla/websocket" 20) 21 22func main() { 23 runFirehoseConsumer("ws://localhost:8080") 24} 25 26func runFirehoseConsumer(relayHost string) error { 27 dialer := websocket.DefaultDialer 28 u, err := url.Parse("wss://cocoon.hailey.at") 29 if err != nil { 30 return fmt.Errorf("invalid relayHost: %w", err) 31 } 32 33 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 34 conn, _, err := dialer.Dial(u.String(), http.Header{ 35 "User-Agent": []string{fmt.Sprintf("hot-topic/0.0.0")}, 36 }) 37 if err != nil { 38 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 39 } 40 41 rsc := &events.RepoStreamCallbacks{ 42 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 43 fmt.Println(evt.Repo) 44 return handleRepoCommit(evt) 45 }, 46 RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error { 47 fmt.Println(evt.Did, evt.Handle) 48 return nil 49 }, 50 } 51 52 var scheduler events.Scheduler 53 parallelism := 700 54 scheduler = parallel.NewScheduler(parallelism, 1000, relayHost, rsc.EventHandler) 55 56 return events.HandleRepoStream(context.TODO(), conn, scheduler, slog.Default()) 57} 58 59func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) { 60 parts := strings.SplitN(path, "/", 3) 61 if len(parts) != 2 { 62 return "", "", fmt.Errorf("invalid record path: %s", path) 63 } 64 collection, err := syntax.ParseNSID(parts[0]) 65 if err != nil { 66 return "", "", err 67 } 68 rkey, err := syntax.ParseRecordKey(parts[1]) 69 if err != nil { 70 return "", "", err 71 } 72 return collection, rkey, nil 73} 74 75func handleRepoCommit(evt *atproto.SyncSubscribeRepos_Commit) error { 76 if evt.TooBig { 77 return nil 78 } 79 80 did, err := syntax.ParseDID(evt.Repo) 81 if err != nil { 82 panic(err) 83 } 84 85 rr, err := repo.ReadRepoFromCar(context.TODO(), bytes.NewReader(evt.Blocks)) 86 if err != nil { 87 panic(err) 88 } 89 90 for _, op := range evt.Ops { 91 collection, rkey, err := splitRepoPath(op.Path) 92 if err != nil { 93 panic(err) 94 } 95 96 ek := repomgr.EventKind(op.Action) 97 98 go func() { 99 switch ek { 100 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 101 rc, recordCBOR, err := rr.GetRecordBytes(context.TODO(), op.Path) 102 if err != nil { 103 panic(err) 104 } 105 106 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 107 panic("nocid") 108 } 109 110 _ = collection 111 _ = rkey 112 _ = recordCBOR 113 _ = did 114 115 } 116 }() 117 } 118 119 return nil 120}