forked from tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/jetstream/pkg/client" 15 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 "github.com/sotangled/tangled/api/tangled" 18 "github.com/sotangled/tangled/knotserver/db" 19 "github.com/sotangled/tangled/log" 20) 21 22type JetstreamClient struct { 23 cfg *client.ClientConfig 24 client *client.Client 25 reconnectCh chan struct{} 26 mu sync.RWMutex 27} 28 29func (h *Handle) StartJetstream(ctx context.Context) error { 30 l := h.l 31 ctx = log.IntoContext(ctx, l) 32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID} 33 dids := []string{} 34 35 cfg := client.DefaultClientConfig() 36 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 37 cfg.WantedCollections = collections 38 cfg.WantedDids = dids 39 40 sched := sequential.NewScheduler("knotserver", l, h.processMessages) 41 42 client, err := client.NewClient(cfg, l, sched) 43 if err != nil { 44 l.Error("failed to create jetstream client", "error", err) 45 } 46 47 jc := &JetstreamClient{ 48 cfg: cfg, 49 client: client, 50 reconnectCh: make(chan struct{}, 1), 51 } 52 53 h.jc = jc 54 55 go func() { 56 lastTimeUs := h.getLastTimeUs(ctx) 57 for len(h.jc.cfg.WantedDids) == 0 { 58 time.Sleep(time.Second) 59 } 60 h.connectAndRead(ctx, &lastTimeUs) 61 }() 62 return nil 63} 64 65func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) { 66 l := log.FromContext(ctx) 67 for { 68 select { 69 case <-h.jc.reconnectCh: 70 l.Info("(re)connecting jetstream client") 71 h.jc.client.Scheduler.Shutdown() 72 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 73 l.Error("error reading jetstream", "error", err) 74 } 75 default: 76 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 77 l.Error("error reading jetstream", "error", err) 78 } 79 } 80 } 81} 82 83func (j *JetstreamClient) AddDid(did string) { 84 j.mu.Lock() 85 j.cfg.WantedDids = append(j.cfg.WantedDids, did) 86 j.mu.Unlock() 87 j.reconnectCh <- struct{}{} 88} 89 90func (j *JetstreamClient) UpdateDids(dids []string) { 91 j.mu.Lock() 92 j.cfg.WantedDids = dids 93 j.mu.Unlock() 94 j.reconnectCh <- struct{}{} 95} 96 97func (h *Handle) getLastTimeUs(ctx context.Context) int64 { 98 l := log.FromContext(ctx) 99 lastTimeUs, err := h.db.GetLastTimeUs() 100 if err != nil { 101 l.Warn("couldn't get last time us, starting from now", "error", err) 102 lastTimeUs = time.Now().UnixMicro() 103 err = h.db.SaveLastTimeUs(lastTimeUs) 104 if err != nil { 105 l.Error("failed to save last time us") 106 } 107 } 108 109 // If last time is older than a week, start from now 110 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 { 111 lastTimeUs = time.Now().UnixMicro() 112 l.Warn("last time us is older than a week. discarding that and starting from now") 113 err = h.db.SaveLastTimeUs(lastTimeUs) 114 if err != nil { 115 l.Error("failed to save last time us") 116 } 117 } 118 119 l.Info("found last time_us", "time_us", lastTimeUs) 120 return lastTimeUs 121} 122 123func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 124 l := log.FromContext(ctx) 125 pk := db.PublicKey{ 126 Did: did, 127 PublicKey: record, 128 } 129 if err := h.db.AddPublicKey(pk); err != nil { 130 l.Error("failed to add public key", "error", err) 131 return fmt.Errorf("failed to add public key: %w", err) 132 } 133 l.Info("added public key from firehose", "did", did) 134 return nil 135} 136 137func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 138 l := log.FromContext(ctx) 139 140 if record.Domain != h.c.Server.Hostname { 141 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 142 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 143 } 144 145 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 146 if err != nil || !ok { 147 l.Error("failed to add member", "did", did) 148 return fmt.Errorf("failed to enforce permissions: %w", err) 149 } 150 151 l.Info("adding member") 152 if err := h.e.AddMember(ThisServer, record.Member); err != nil { 153 l.Error("failed to add member", "error", err) 154 return fmt.Errorf("failed to add member: %w", err) 155 } 156 l.Info("added member from firehose", "member", record.Member) 157 158 if err := h.db.AddDid(did); err != nil { 159 l.Error("failed to add did", "error", err) 160 return fmt.Errorf("failed to add did: %w", err) 161 } 162 163 if err := h.fetchAndAddKeys(ctx, did); err != nil { 164 return fmt.Errorf("failed to fetch and add keys: %w", err) 165 } 166 167 h.jc.UpdateDids([]string{did}) 168 return nil 169} 170 171func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 172 l := log.FromContext(ctx) 173 174 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 175 if err != nil { 176 l.Error("error building endpoint url", "did", did, "error", err.Error()) 177 return fmt.Errorf("error building endpoint url: %w", err) 178 } 179 180 resp, err := http.Get(keysEndpoint) 181 if err != nil { 182 l.Error("error getting keys", "did", did, "error", err) 183 return fmt.Errorf("error getting keys: %w", err) 184 } 185 defer resp.Body.Close() 186 187 if resp.StatusCode == http.StatusNotFound { 188 l.Info("no keys found for did", "did", did) 189 return nil 190 } 191 192 plaintext, err := io.ReadAll(resp.Body) 193 if err != nil { 194 l.Error("error reading response body", "error", err) 195 return fmt.Errorf("error reading response body: %w", err) 196 } 197 198 for _, key := range strings.Split(string(plaintext), "\n") { 199 if key == "" { 200 continue 201 } 202 pk := db.PublicKey{ 203 Did: did, 204 } 205 pk.Key = key 206 if err := h.db.AddPublicKey(pk); err != nil { 207 l.Error("failed to add public key", "error", err) 208 return fmt.Errorf("failed to add public key: %w", err) 209 } 210 } 211 return nil 212} 213 214func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 215 did := event.Did 216 217 raw := json.RawMessage(event.Commit.Record) 218 219 switch event.Commit.Collection { 220 case tangled.PublicKeyNSID: 221 var record tangled.PublicKey 222 if err := json.Unmarshal(raw, &record); err != nil { 223 return fmt.Errorf("failed to unmarshal record: %w", err) 224 } 225 if err := h.processPublicKey(ctx, did, record); err != nil { 226 return fmt.Errorf("failed to process public key: %w", err) 227 } 228 229 case tangled.KnotMemberNSID: 230 var record tangled.KnotMember 231 if err := json.Unmarshal(raw, &record); err != nil { 232 return fmt.Errorf("failed to unmarshal record: %w", err) 233 } 234 if err := h.processKnotMember(ctx, did, record); err != nil { 235 return fmt.Errorf("failed to process knot member: %w", err) 236 } 237 } 238 239 lastTimeUs := event.TimeUS 240 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 241 return fmt.Errorf("failed to save last time us: %w", err) 242 } 243 244 return nil 245}