forked from tangled.org/core
this repo has no description
at master 4.1 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "strings" 11 12 "github.com/bluesky-social/jetstream/pkg/models" 13 "tangled.sh/tangled.sh/core/api/tangled" 14 "tangled.sh/tangled.sh/core/knotserver/db" 15 "tangled.sh/tangled.sh/core/log" 16) 17 18func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 19 l := log.FromContext(ctx) 20 pk := db.PublicKey{ 21 Did: did, 22 PublicKey: record, 23 } 24 if err := h.db.AddPublicKey(pk); err != nil { 25 l.Error("failed to add public key", "error", err) 26 return fmt.Errorf("failed to add public key: %w", err) 27 } 28 l.Info("added public key from firehose", "did", did) 29 return nil 30} 31 32func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 33 l := log.FromContext(ctx) 34 35 if record.Domain != h.c.Server.Hostname { 36 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 37 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 38 } 39 40 ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 41 if err != nil || !ok { 42 l.Error("failed to add member", "did", did) 43 return fmt.Errorf("failed to enforce permissions: %w", err) 44 } 45 46 if err := h.e.AddMember(ThisServer, record.Member); err != nil { 47 l.Error("failed to add member", "error", err) 48 return fmt.Errorf("failed to add member: %w", err) 49 } 50 l.Info("added member from firehose", "member", record.Member) 51 52 if err := h.db.AddDid(did); err != nil { 53 l.Error("failed to add did", "error", err) 54 return fmt.Errorf("failed to add did: %w", err) 55 } 56 h.jc.AddDid(did) 57 58 if err := h.fetchAndAddKeys(ctx, did); err != nil { 59 return fmt.Errorf("failed to fetch and add keys: %w", err) 60 } 61 62 return nil 63} 64 65func (h *Handle) fetchAndAddKeys(ctx context.Context, did string) error { 66 l := log.FromContext(ctx) 67 68 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 69 if err != nil { 70 l.Error("error building endpoint url", "did", did, "error", err.Error()) 71 return fmt.Errorf("error building endpoint url: %w", err) 72 } 73 74 resp, err := http.Get(keysEndpoint) 75 if err != nil { 76 l.Error("error getting keys", "did", did, "error", err) 77 return fmt.Errorf("error getting keys: %w", err) 78 } 79 defer resp.Body.Close() 80 81 if resp.StatusCode == http.StatusNotFound { 82 l.Info("no keys found for did", "did", did) 83 return nil 84 } 85 86 plaintext, err := io.ReadAll(resp.Body) 87 if err != nil { 88 l.Error("error reading response body", "error", err) 89 return fmt.Errorf("error reading response body: %w", err) 90 } 91 92 for _, key := range strings.Split(string(plaintext), "\n") { 93 if key == "" { 94 continue 95 } 96 pk := db.PublicKey{ 97 Did: did, 98 } 99 pk.Key = key 100 if err := h.db.AddPublicKey(pk); err != nil { 101 l.Error("failed to add public key", "error", err) 102 return fmt.Errorf("failed to add public key: %w", err) 103 } 104 } 105 return nil 106} 107 108func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 109 did := event.Did 110 if event.Kind != models.EventKindCommit { 111 return nil 112 } 113 114 var err error 115 defer func() { 116 eventTime := event.TimeUS 117 lastTimeUs := eventTime + 1 118 fmt.Println("lastTimeUs", lastTimeUs) 119 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 120 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 121 } 122 }() 123 124 raw := json.RawMessage(event.Commit.Record) 125 126 switch event.Commit.Collection { 127 case tangled.PublicKeyNSID: 128 var record tangled.PublicKey 129 if err := json.Unmarshal(raw, &record); err != nil { 130 return fmt.Errorf("failed to unmarshal record: %w", err) 131 } 132 if err := h.processPublicKey(ctx, did, record); err != nil { 133 return fmt.Errorf("failed to process public key: %w", err) 134 } 135 136 case tangled.KnotMemberNSID: 137 var record tangled.KnotMember 138 if err := json.Unmarshal(raw, &record); err != nil { 139 return fmt.Errorf("failed to unmarshal record: %w", err) 140 } 141 if err := h.processKnotMember(ctx, did, record); err != nil { 142 return fmt.Errorf("failed to process knot member: %w", err) 143 } 144 } 145 146 return err 147}