1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "strings"
11
12 "github.com/bluesky-social/jetstream/pkg/models"
13 "github.com/sotangled/tangled/api/tangled"
14 "github.com/sotangled/tangled/knotserver/db"
15 "github.com/sotangled/tangled/log"
16)
17
18func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error {
19 l := log.FromContext(ctx)
20 pk := db.PublicKey{
21 Did: did,
22 PublicKey: record,
23 }
24 if err := h.db.AddPublicKey(pk); err != nil {
25 l.Error("failed to add public key", "error", err)
26 return fmt.Errorf("failed to add public key: %w", err)
27 }
28 l.Info("added public key from firehose", "did", did)
29 return nil
30}
31
32func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error {
33 l := log.FromContext(ctx)
34
35 if record.Domain != h.c.Server.Hostname {
36 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
37 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
38 }
39
40 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite")
41 if err != nil || !ok {
42 l.Error("failed to add member", "did", did)
43 return fmt.Errorf("failed to enforce permissions: %w", err)
44 }
45
46 if err := h.e.AddMember(ThisServer, record.Member); err != nil {
47 l.Error("failed to add member", "error", err)
48 return fmt.Errorf("failed to add member: %w", err)
49 }
50 l.Info("added member from firehose", "member", record.Member)
51
52 if err := h.db.AddDid(did); err != nil {
53 l.Error("failed to add did", "error", err)
54 return fmt.Errorf("failed to add did: %w", err)
55 }
56
57 if err := h.fetchAndAddKeys(ctx, did); err != nil {
58 return fmt.Errorf("failed to fetch and add keys: %w", err)
59 }
60
61 return nil
62}
63
64func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error {
65 l := log.FromContext(ctx)
66
67 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
68 if err != nil {
69 l.Error("error building endpoint url", "did", did, "error", err.Error())
70 return fmt.Errorf("error building endpoint url: %w", err)
71 }
72
73 resp, err := http.Get(keysEndpoint)
74 if err != nil {
75 l.Error("error getting keys", "did", did, "error", err)
76 return fmt.Errorf("error getting keys: %w", err)
77 }
78 defer resp.Body.Close()
79
80 if resp.StatusCode == http.StatusNotFound {
81 l.Info("no keys found for did", "did", did)
82 return nil
83 }
84
85 plaintext, err := io.ReadAll(resp.Body)
86 if err != nil {
87 l.Error("error reading response body", "error", err)
88 return fmt.Errorf("error reading response body: %w", err)
89 }
90
91 for _, key := range strings.Split(string(plaintext), "\n") {
92 if key == "" {
93 continue
94 }
95 pk := db.PublicKey{
96 Did: did,
97 }
98 pk.Key = key
99 if err := h.db.AddPublicKey(pk); err != nil {
100 l.Error("failed to add public key", "error", err)
101 return fmt.Errorf("failed to add public key: %w", err)
102 }
103 }
104 return nil
105}
106
107func (h *Handle) processMessages(ctx context.Context, event *models.Event) error {
108 did := event.Did
109 if event.Kind != models.EventKindCommit {
110 return nil
111 }
112
113 var err error
114 defer func() {
115 eventTime := event.TimeUS
116 lastTimeUs := eventTime + 1
117 fmt.Println("lastTimeUs", lastTimeUs)
118 if err := h.db.UpdateLastTimeUs(lastTimeUs); err != nil {
119 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
120 }
121 h.jc.UpdateDids([]string{did})
122 }()
123
124 raw := json.RawMessage(event.Commit.Record)
125
126 switch event.Commit.Collection {
127 case tangled.PublicKeyNSID:
128 var record tangled.PublicKey
129 if err := json.Unmarshal(raw, &record); err != nil {
130 return fmt.Errorf("failed to unmarshal record: %w", err)
131 }
132 if err := h.processPublicKey(ctx, did, record); err != nil {
133 return fmt.Errorf("failed to process public key: %w", err)
134 }
135
136 case tangled.KnotMemberNSID:
137 var record tangled.KnotMember
138 if err := json.Unmarshal(raw, &record); err != nil {
139 return fmt.Errorf("failed to unmarshal record: %w", err)
140 }
141 if err := h.processKnotMember(ctx, did, record); err != nil {
142 return fmt.Errorf("failed to process knot member: %w", err)
143 }
144 }
145
146 return err
147}