1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/db"
16 "tangled.sh/tangled.sh/core/rbac"
17)
18
19type Ingester func(ctx context.Context, e *models.Event) error
20
21func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester {
22 return func(ctx context.Context, e *models.Event) error {
23 var err error
24 defer func() {
25 eventTime := e.TimeUS
26 lastTimeUs := eventTime + 1
27 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
28 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
29 }
30 }()
31
32 if e.Kind != models.EventKindCommit {
33 return nil
34 }
35
36 switch e.Commit.Collection {
37 case tangled.GraphFollowNSID:
38 ingestFollow(&d, e)
39 case tangled.FeedStarNSID:
40 ingestStar(&d, e)
41 case tangled.PublicKeyNSID:
42 ingestPublicKey(&d, e)
43 case tangled.RepoArtifactNSID:
44 ingestArtifact(&d, e, enforcer)
45 case tangled.ActorProfileNSID:
46 ingestProfile(&d, e)
47 }
48
49 return err
50 }
51}
52
53func ingestStar(d *db.DbWrapper, e *models.Event) error {
54 var err error
55 did := e.Did
56
57 switch e.Commit.Operation {
58 case models.CommitOperationCreate, models.CommitOperationUpdate:
59 var subjectUri syntax.ATURI
60
61 raw := json.RawMessage(e.Commit.Record)
62 record := tangled.FeedStar{}
63 err := json.Unmarshal(raw, &record)
64 if err != nil {
65 log.Println("invalid record")
66 return err
67 }
68
69 subjectUri, err = syntax.ParseATURI(record.Subject)
70 if err != nil {
71 log.Println("invalid record")
72 return err
73 }
74 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
75 case models.CommitOperationDelete:
76 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
77 }
78
79 if err != nil {
80 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
81 }
82
83 return nil
84}
85
86func ingestFollow(d *db.DbWrapper, e *models.Event) error {
87 var err error
88 did := e.Did
89
90 switch e.Commit.Operation {
91 case models.CommitOperationCreate, models.CommitOperationUpdate:
92 raw := json.RawMessage(e.Commit.Record)
93 record := tangled.GraphFollow{}
94 err = json.Unmarshal(raw, &record)
95 if err != nil {
96 log.Println("invalid record")
97 return err
98 }
99
100 subjectDid := record.Subject
101 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
102 case models.CommitOperationDelete:
103 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
104 }
105
106 if err != nil {
107 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
108 }
109
110 return nil
111}
112
113func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
114 did := e.Did
115 var err error
116
117 switch e.Commit.Operation {
118 case models.CommitOperationCreate, models.CommitOperationUpdate:
119 log.Println("processing add of pubkey")
120 raw := json.RawMessage(e.Commit.Record)
121 record := tangled.PublicKey{}
122 err = json.Unmarshal(raw, &record)
123 if err != nil {
124 log.Printf("invalid record: %s", err)
125 return err
126 }
127
128 name := record.Name
129 key := record.Key
130 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
131 case models.CommitOperationDelete:
132 log.Println("processing delete of pubkey")
133 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
134 }
135
136 if err != nil {
137 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
138 }
139
140 return nil
141}
142
143func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error {
144 did := e.Did
145 var err error
146
147 switch e.Commit.Operation {
148 case models.CommitOperationCreate, models.CommitOperationUpdate:
149 raw := json.RawMessage(e.Commit.Record)
150 record := tangled.RepoArtifact{}
151 err = json.Unmarshal(raw, &record)
152 if err != nil {
153 log.Printf("invalid record: %s", err)
154 return err
155 }
156
157 repoAt, err := syntax.ParseATURI(record.Repo)
158 if err != nil {
159 return err
160 }
161
162 repo, err := db.GetRepoByAtUri(d, repoAt.String())
163 if err != nil {
164 return err
165 }
166
167 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
168 if err != nil || !ok {
169 return err
170 }
171
172 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
173 if err != nil {
174 createdAt = time.Now()
175 }
176
177 artifact := db.Artifact{
178 Did: did,
179 Rkey: e.Commit.RKey,
180 RepoAt: repoAt,
181 Tag: plumbing.Hash(record.Tag),
182 CreatedAt: createdAt,
183 BlobCid: cid.Cid(record.Artifact.Ref),
184 Name: record.Name,
185 Size: uint64(record.Artifact.Size),
186 MimeType: record.Artifact.MimeType,
187 }
188
189 err = db.AddArtifact(d, artifact)
190 case models.CommitOperationDelete:
191 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey))
192 }
193
194 if err != nil {
195 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
196 }
197
198 return nil
199}
200
201func ingestProfile(d *db.DbWrapper, e *models.Event) error {
202 did := e.Did
203 var err error
204
205 if e.Commit.RKey != "self" {
206 return fmt.Errorf("ingestProfile only ingests `self` record")
207 }
208
209 switch e.Commit.Operation {
210 case models.CommitOperationCreate, models.CommitOperationUpdate:
211 raw := json.RawMessage(e.Commit.Record)
212 record := tangled.ActorProfile{}
213 err = json.Unmarshal(raw, &record)
214 if err != nil {
215 log.Printf("invalid record: %s", err)
216 return err
217 }
218
219 description := ""
220 if record.Description != nil {
221 description = *record.Description
222 }
223
224 includeBluesky := record.Bluesky
225
226 location := ""
227 if record.Location != nil {
228 location = *record.Location
229 }
230
231 var links [5]string
232 for i, l := range record.Links {
233 if i < 5 {
234 links[i] = l
235 }
236 }
237
238 var stats [2]db.VanityStat
239 for i, s := range record.Stats {
240 if i < 2 {
241 stats[i].Kind = db.VanityStatKind(s)
242 }
243 }
244
245 var pinned [6]syntax.ATURI
246 for i, r := range record.PinnedRepositories {
247 if i < 6 {
248 pinned[i] = syntax.ATURI(r)
249 }
250 }
251
252 profile := db.Profile{
253 Did: did,
254 Description: description,
255 IncludeBluesky: includeBluesky,
256 Location: location,
257 Links: links,
258 Stats: stats,
259 PinnedRepos: pinned,
260 }
261
262 ddb, ok := d.Execer.(*db.DB)
263 if !ok {
264 return fmt.Errorf("failed to index profile record, invalid db cast")
265 }
266
267 tx, err := ddb.Begin()
268 if err != nil {
269 return fmt.Errorf("failed to start transaction")
270 }
271
272 err = db.ValidateProfile(tx, &profile)
273 if err != nil {
274 return fmt.Errorf("invalid profile record")
275 }
276
277 err = db.UpsertProfile(tx, &profile)
278 case models.CommitOperationDelete:
279 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey))
280 }
281
282 if err != nil {
283 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
284 }
285
286 return nil
287}