1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/bluesky-social/jetstream/pkg/models"
12 "github.com/go-git/go-git/v5/plumbing"
13 "github.com/ipfs/go-cid"
14 "tangled.sh/tangled.sh/core/api/tangled"
15 "tangled.sh/tangled.sh/core/appview/db"
16)
17
18type Ingester func(ctx context.Context, e *models.Event) error
19
20func Ingest(d db.DbWrapper) Ingester {
21 return func(ctx context.Context, e *models.Event) error {
22 var err error
23 defer func() {
24 eventTime := e.TimeUS
25 lastTimeUs := eventTime + 1
26 if err := d.SaveLastTimeUs(lastTimeUs); err != nil {
27 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
28 }
29 }()
30
31 if e.Kind != models.EventKindCommit {
32 return nil
33 }
34
35 switch e.Commit.Collection {
36 case tangled.GraphFollowNSID:
37 ingestFollow(&d, e)
38 case tangled.FeedStarNSID:
39 ingestStar(&d, e)
40 case tangled.PublicKeyNSID:
41 ingestPublicKey(&d, e)
42 case tangled.RepoArtifactNSID:
43 ingestArtifact(&d, e)
44 case tangled.ActorProfileNSID:
45 ingestProfile(&d, e)
46 }
47
48 return err
49 }
50}
51
52func ingestStar(d *db.DbWrapper, e *models.Event) error {
53 var err error
54 did := e.Did
55
56 switch e.Commit.Operation {
57 case models.CommitOperationCreate, models.CommitOperationUpdate:
58 var subjectUri syntax.ATURI
59
60 raw := json.RawMessage(e.Commit.Record)
61 record := tangled.FeedStar{}
62 err := json.Unmarshal(raw, &record)
63 if err != nil {
64 log.Println("invalid record")
65 return err
66 }
67
68 subjectUri, err = syntax.ParseATURI(record.Subject)
69 if err != nil {
70 log.Println("invalid record")
71 return err
72 }
73 err = db.AddStar(d, did, subjectUri, e.Commit.RKey)
74 case models.CommitOperationDelete:
75 err = db.DeleteStarByRkey(d, did, e.Commit.RKey)
76 }
77
78 if err != nil {
79 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
80 }
81
82 return nil
83}
84
85func ingestFollow(d *db.DbWrapper, e *models.Event) error {
86 var err error
87 did := e.Did
88
89 switch e.Commit.Operation {
90 case models.CommitOperationCreate, models.CommitOperationUpdate:
91 raw := json.RawMessage(e.Commit.Record)
92 record := tangled.GraphFollow{}
93 err = json.Unmarshal(raw, &record)
94 if err != nil {
95 log.Println("invalid record")
96 return err
97 }
98
99 subjectDid := record.Subject
100 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey)
101 case models.CommitOperationDelete:
102 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey)
103 }
104
105 if err != nil {
106 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
107 }
108
109 return nil
110}
111
112func ingestPublicKey(d *db.DbWrapper, e *models.Event) error {
113 did := e.Did
114 var err error
115
116 switch e.Commit.Operation {
117 case models.CommitOperationCreate, models.CommitOperationUpdate:
118 log.Println("processing add of pubkey")
119 raw := json.RawMessage(e.Commit.Record)
120 record := tangled.PublicKey{}
121 err = json.Unmarshal(raw, &record)
122 if err != nil {
123 log.Printf("invalid record: %s", err)
124 return err
125 }
126
127 name := record.Name
128 key := record.Key
129 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey)
130 case models.CommitOperationDelete:
131 log.Println("processing delete of pubkey")
132 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey)
133 }
134
135 if err != nil {
136 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
137 }
138
139 return nil
140}
141
142func ingestArtifact(d *db.DbWrapper, e *models.Event) error {
143 did := e.Did
144 var err error
145
146 switch e.Commit.Operation {
147 case models.CommitOperationCreate, models.CommitOperationUpdate:
148 raw := json.RawMessage(e.Commit.Record)
149 record := tangled.RepoArtifact{}
150 err = json.Unmarshal(raw, &record)
151 if err != nil {
152 log.Printf("invalid record: %s", err)
153 return err
154 }
155
156 repoAt, err := syntax.ParseATURI(record.Repo)
157 if err != nil {
158 return err
159 }
160
161 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
162 if err != nil {
163 createdAt = time.Now()
164 }
165
166 artifact := db.Artifact{
167 Did: did,
168 Rkey: e.Commit.RKey,
169 RepoAt: repoAt,
170 Tag: plumbing.Hash(record.Tag),
171 CreatedAt: createdAt,
172 BlobCid: cid.Cid(record.Artifact.Ref),
173 Name: record.Name,
174 Size: uint64(record.Artifact.Size),
175 MimeType: record.Artifact.MimeType,
176 }
177
178 err = db.AddArtifact(d, artifact)
179 case models.CommitOperationDelete:
180 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey))
181 }
182
183 if err != nil {
184 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
185 }
186
187 return nil
188}
189
190func ingestProfile(d *db.DbWrapper, e *models.Event) error {
191 did := e.Did
192 var err error
193
194 if e.Commit.RKey != "self" {
195 return fmt.Errorf("ingestProfile only ingests `self` record")
196 }
197
198 switch e.Commit.Operation {
199 case models.CommitOperationCreate, models.CommitOperationUpdate:
200 raw := json.RawMessage(e.Commit.Record)
201 record := tangled.ActorProfile{}
202 err = json.Unmarshal(raw, &record)
203 if err != nil {
204 log.Printf("invalid record: %s", err)
205 return err
206 }
207
208 description := ""
209 if record.Description != nil {
210 description = *record.Description
211 }
212
213 includeBluesky := false
214 if record.Bluesky != nil {
215 includeBluesky = *record.Bluesky
216 }
217
218 location := ""
219 if record.Location != nil {
220 location = *record.Location
221 }
222
223 var links [5]string
224 for i, l := range record.Links {
225 if i < 5 {
226 links[i] = l
227 }
228 }
229
230 var stats [2]db.VanityStat
231 for i, s := range record.Stats {
232 if i < 2 {
233 stats[i].Kind = db.VanityStatKind(s)
234 }
235 }
236
237 var pinned [6]syntax.ATURI
238 for i, r := range record.PinnedRepositories {
239 if i < 6 {
240 pinned[i] = syntax.ATURI(r)
241 }
242 }
243
244 profile := db.Profile{
245 Did: did,
246 Description: description,
247 IncludeBluesky: includeBluesky,
248 Location: location,
249 Links: links,
250 Stats: stats,
251 PinnedRepos: pinned,
252 }
253
254 ddb, ok := d.Execer.(*db.DB)
255 if !ok {
256 return fmt.Errorf("failed to index profile record, invalid db cast")
257 }
258
259 tx, err := ddb.Begin()
260 if err != nil {
261 return fmt.Errorf("failed to start transaction")
262 }
263
264 err = db.ValidateProfile(tx, &profile)
265 if err != nil {
266 return fmt.Errorf("invalid profile record")
267 }
268
269 err = db.UpsertProfile(tx, &profile)
270 case models.CommitOperationDelete:
271 err = db.DeleteArtifact(d, db.Filter("did", did), db.Filter("rkey", e.Commit.RKey))
272 }
273
274 if err != nil {
275 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
276 }
277
278 return nil
279}