forked from
tangled.org/core
Monorepo for Tangled — https://tangled.org
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/jetstream/pkg/client"
15 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 "github.com/sotangled/tangled/api/tangled"
18 "github.com/sotangled/tangled/knotserver/db"
19 "github.com/sotangled/tangled/log"
20)
21
22type JetstreamClient struct {
23 cfg *client.ClientConfig
24 client *client.Client
25 reconnectCh chan struct{}
26 mu sync.RWMutex
27}
28
29func (h *Handle) StartJetstream(ctx context.Context) error {
30 l := h.l
31 ctx = log.IntoContext(ctx, l)
32 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
33 dids := []string{}
34
35 cfg := client.DefaultClientConfig()
36 cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe"
37 cfg.WantedCollections = collections
38 cfg.WantedDids = dids
39
40 sched := sequential.NewScheduler("knotserver", l, h.processMessages)
41
42 client, err := client.NewClient(cfg, l, sched)
43 if err != nil {
44 l.Error("failed to create jetstream client", "error", err)
45 }
46
47 jc := &JetstreamClient{
48 cfg: cfg,
49 client: client,
50 reconnectCh: make(chan struct{}, 1),
51 }
52
53 h.jc = jc
54
55 go func() {
56 lastTimeUs := h.getLastTimeUs(ctx)
57 for len(h.jc.cfg.WantedDids) == 0 {
58 time.Sleep(time.Second)
59 }
60 h.connectAndRead(ctx, &lastTimeUs)
61 }()
62 return nil
63}
64
65func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) {
66 l := log.FromContext(ctx)
67 for {
68 select {
69 case <-h.jc.reconnectCh:
70 l.Info("(re)connecting jetstream client")
71 h.jc.client.Scheduler.Shutdown()
72 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
73 l.Error("error reading jetstream", "error", err)
74 }
75 default:
76 if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil {
77 l.Error("error reading jetstream", "error", err)
78 }
79 }
80 }
81}
82
83func (j *JetstreamClient) AddDid(did string) {
84 j.mu.Lock()
85 j.cfg.WantedDids = append(j.cfg.WantedDids, did)
86 j.mu.Unlock()
87 j.reconnectCh <- struct{}{}
88}
89
90func (j *JetstreamClient) UpdateDids(dids []string) {
91 j.mu.Lock()
92 j.cfg.WantedDids = dids
93 j.mu.Unlock()
94 j.reconnectCh <- struct{}{}
95}
96
97func (h *Handle) getLastTimeUs(ctx context.Context) int64 {
98 l := log.FromContext(ctx)
99 lastTimeUs, err := h.db.GetLastTimeUs()
100 if err != nil {
101 l.Warn("couldn't get last time us, starting from now", "error", err)
102 lastTimeUs = time.Now().UnixMicro()
103 err = h.db.SaveLastTimeUs(lastTimeUs)
104 if err != nil {
105 l.Error("failed to save last time us")
106 }
107 }
108
109 // If last time is older than a week, start from now
110 if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
111 lastTimeUs = time.Now().UnixMicro()
112 l.Warn("last time us is older than a week. discarding that and starting from now")
113 err = h.db.SaveLastTimeUs(lastTimeUs)
114 if err != nil {
115 l.Error("failed to save last time us")
116 }
117 }
118
119 l.Info("found last time_us", "time_us", lastTimeUs)
120 return lastTimeUs
121}
122
123func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
124 l := log.FromContext(ctx)
125 pk := db.PublicKey{
126 Did: did,
127 PublicKey: record,
128 }
129 if err := h.db.AddPublicKey(pk); err != nil {
130 l.Error("failed to add public key", "error", err)
131 return fmt.Errorf("failed to add public key: %w", err)
132 }
133 l.Info("added public key from firehose", "did", did)
134 return nil
135}
136
137func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
138 l := log.FromContext(ctx)
139
140 if record.Domain != h.c.Server.Hostname {
141 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
142 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
143 }
144
145 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
146 if err != nil || !ok {
147 l.Error("failed to add member", "did", did)
148 return fmt.Errorf("failed to enforce permissions: %w", err)
149 }
150
151 l.Info("adding member")
152 if err := h.e.AddMember(ThisServer, record.Member); err != nil {
153 l.Error("failed to add member", "error", err)
154 return fmt.Errorf("failed to add member: %w", err)
155 }
156 l.Info("added member from firehose", "member", record.Member)
157
158 if err := h.db.AddDid(did); err != nil {
159 l.Error("failed to add did", "error", err)
160 return fmt.Errorf("failed to add did: %w", err)
161 }
162
163 if err := h.fetchAndAddKeys(ctx, did); err != nil {
164 return fmt.Errorf("failed to fetch and add keys: %w", err)
165 }
166
167 h.jc.UpdateDids([]string{did})
168 return nil
169}
170
171func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
172 l := log.FromContext(ctx)
173
174 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
175 if err != nil {
176 l.Error("error building endpoint url", "did", did, "error", err.Error())
177 return fmt.Errorf("error building endpoint url: %w", err)
178 }
179
180 resp, err := http.Get(keysEndpoint)
181 if err != nil {
182 l.Error("error getting keys", "did", did, "error", err)
183 return fmt.Errorf("error getting keys: %w", err)
184 }
185 defer resp.Body.Close()
186
187 if resp.StatusCode == http.StatusNotFound {
188 l.Info("no keys found for did", "did", did)
189 return nil
190 }
191
192 plaintext, err := io.ReadAll(resp.Body)
193 if err != nil {
194 l.Error("error reading response body", "error", err)
195 return fmt.Errorf("error reading response body: %w", err)
196 }
197
198 for _, key := range strings.Split(string(plaintext), "\n") {
199 if key == "" {
200 continue
201 }
202 pk := db.PublicKey{
203 Did: did,
204 }
205 pk.Key = key
206 if err := h.db.AddPublicKey(pk); err != nil {
207 l.Error("failed to add public key", "error", err)
208 return fmt.Errorf("failed to add public key: %w", err)
209 }
210 }
211 return nil
212}
213
214func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
215 did := event.Did
216
217 raw := json.RawMessage(event.Commit.Record)
218
219 switch event.Commit.Collection {
220 case tangled.PublicKeyNSID:
221 var record tangled.PublicKey
222 if err := json.Unmarshal(raw, &record); err != nil {
223 return fmt.Errorf("failed to unmarshal record: %w", err)
224 }
225 if err := h.processPublicKey(ctx, did, record); err != nil {
226 return fmt.Errorf("failed to process public key: %w", err)
227 }
228
229 case tangled.KnotMemberNSID:
230 var record tangled.KnotMember
231 if err := json.Unmarshal(raw, &record); err != nil {
232 return fmt.Errorf("failed to unmarshal record: %w", err)
233 }
234 if err := h.processKnotMember(ctx, did, record); err != nil {
235 return fmt.Errorf("failed to process knot member: %w", err)
236 }
237 }
238
239 lastTimeUs := event.TimeUS
240 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
241 return fmt.Errorf("failed to save last time us: %w", err)
242 }
243
244 return nil
245}